LangChain OpenTutorial
  • 🦜️🔗 The LangChain Open Tutorial for Everyone
  • 01-Basic
    • Getting Started on Windows
    • 02-Getting-Started-Mac
    • OpenAI API Key Generation and Testing Guide
    • LangSmith Tracking Setup
    • Using the OpenAI API (GPT-4o Multimodal)
    • Basic Example: Prompt+Model+OutputParser
    • LCEL Interface
    • Runnable
  • 02-Prompt
    • Prompt Template
    • Few-Shot Templates
    • LangChain Hub
    • Personal Prompts for LangChain
    • Prompt Caching
  • 03-OutputParser
    • PydanticOutputParser
    • PydanticOutputParser
    • CommaSeparatedListOutputParser
    • Structured Output Parser
    • JsonOutputParser
    • PandasDataFrameOutputParser
    • DatetimeOutputParser
    • EnumOutputParser
    • Output Fixing Parser
  • 04-Model
    • Using Various LLM Models
    • Chat Models
    • Caching
    • Caching VLLM
    • Model Serialization
    • Check Token Usage
    • Google Generative AI
    • Huggingface Endpoints
    • HuggingFace Local
    • HuggingFace Pipeline
    • ChatOllama
    • GPT4ALL
    • Video Q&A LLM (Gemini)
  • 05-Memory
    • ConversationBufferMemory
    • ConversationBufferWindowMemory
    • ConversationTokenBufferMemory
    • ConversationEntityMemory
    • ConversationKGMemory
    • ConversationSummaryMemory
    • VectorStoreRetrieverMemory
    • LCEL (Remembering Conversation History): Adding Memory
    • Memory Using SQLite
    • Conversation With History
  • 06-DocumentLoader
    • Document & Document Loader
    • PDF Loader
    • WebBaseLoader
    • CSV Loader
    • Excel File Loading in LangChain
    • Microsoft Word(doc, docx) With Langchain
    • Microsoft PowerPoint
    • TXT Loader
    • JSON
    • Arxiv Loader
    • UpstageDocumentParseLoader
    • LlamaParse
    • HWP (Hangeul) Loader
  • 07-TextSplitter
    • Character Text Splitter
    • 02. RecursiveCharacterTextSplitter
    • Text Splitting Methods in NLP
    • TokenTextSplitter
    • SemanticChunker
    • Split code with Langchain
    • MarkdownHeaderTextSplitter
    • HTMLHeaderTextSplitter
    • RecursiveJsonSplitter
  • 08-Embedding
    • OpenAI Embeddings
    • CacheBackedEmbeddings
    • HuggingFace Embeddings
    • Upstage
    • Ollama Embeddings With Langchain
    • LlamaCpp Embeddings With Langchain
    • GPT4ALL
    • Multimodal Embeddings With Langchain
  • 09-VectorStore
    • Vector Stores
    • Chroma
    • Faiss
    • Pinecone
    • Qdrant
    • Elasticsearch
    • MongoDB Atlas
    • PGVector
    • Neo4j
    • Weaviate
    • Faiss
    • {VectorStore Name}
  • 10-Retriever
    • VectorStore-backed Retriever
    • Contextual Compression Retriever
    • Ensemble Retriever
    • Long Context Reorder
    • Parent Document Retriever
    • MultiQueryRetriever
    • MultiVectorRetriever
    • Self-querying
    • TimeWeightedVectorStoreRetriever
    • TimeWeightedVectorStoreRetriever
    • Kiwi BM25 Retriever
    • Ensemble Retriever with Convex Combination (CC)
  • 11-Reranker
    • Cross Encoder Reranker
    • JinaReranker
    • FlashRank Reranker
  • 12-RAG
    • Understanding the basic structure of RAG
    • RAG Basic WebBaseLoader
    • Exploring RAG in LangChain
    • RAPTOR: Recursive Abstractive Processing for Tree-Organized Retrieval
    • Conversation-With-History
    • Translation
    • Multi Modal RAG
  • 13-LangChain-Expression-Language
    • RunnablePassthrough
    • Inspect Runnables
    • RunnableLambda
    • Routing
    • Runnable Parallel
    • Configure-Runtime-Chain-Components
    • Creating Runnable objects with chain decorator
    • RunnableWithMessageHistory
    • Generator
    • Binding
    • Fallbacks
    • RunnableRetry
    • WithListeners
    • How to stream runnables
  • 14-Chains
    • Summarization
    • SQL
    • Structured Output Chain
    • StructuredDataChat
  • 15-Agent
    • Tools
    • Bind Tools
    • Tool Calling Agent
    • Tool Calling Agent with More LLM Models
    • Iteration-human-in-the-loop
    • Agentic RAG
    • CSV/Excel Analysis Agent
    • Agent-with-Toolkits-File-Management
    • Make Report Using RAG, Web searching, Image generation Agent
    • TwoAgentDebateWithTools
    • React Agent
  • 16-Evaluations
    • Generate synthetic test dataset (with RAGAS)
    • Evaluation using RAGAS
    • HF-Upload
    • LangSmith-Dataset
    • LLM-as-Judge
    • Embedding-based Evaluator(embedding_distance)
    • LangSmith Custom LLM Evaluation
    • Heuristic Evaluation
    • Compare experiment evaluations
    • Summary Evaluators
    • Groundedness Evaluation
    • Pairwise Evaluation
    • LangSmith Repeat Evaluation
    • LangSmith Online Evaluation
    • LangFuse Online Evaluation
  • 17-LangGraph
    • 01-Core-Features
      • Understanding Common Python Syntax Used in LangGraph
      • Title
      • Building a Basic Chatbot with LangGraph
      • Building an Agent with LangGraph
      • Agent with Memory
      • LangGraph Streaming Outputs
      • Human-in-the-loop
      • LangGraph Manual State Update
      • Asking Humans for Help: Customizing State in LangGraph
      • DeleteMessages
      • DeleteMessages
      • LangGraph ToolNode
      • LangGraph ToolNode
      • Branch Creation for Parallel Node Execution
      • Conversation Summaries with LangGraph
      • Conversation Summaries with LangGraph
      • LangGrpah Subgraph
      • How to transform the input and output of a subgraph
      • LangGraph Streaming Mode
      • Errors
      • A Long-Term Memory Agent
    • 02-Structures
      • LangGraph-Building-Graphs
      • Naive RAG
      • Add Groundedness Check
      • Adding a Web Search Module
      • LangGraph-Add-Query-Rewrite
      • Agentic RAG
      • Adaptive RAG
      • Multi-Agent Structures (1)
      • Multi Agent Structures (2)
    • 03-Use-Cases
      • LangGraph Agent Simulation
      • Meta Prompt Generator based on User Requirements
      • CRAG: Corrective RAG
      • Plan-and-Execute
      • Multi Agent Collaboration Network
      • Multi Agent Collaboration Network
      • Multi-Agent Supervisor
      • 08-LangGraph-Hierarchical-Multi-Agent-Teams
      • 08-LangGraph-Hierarchical-Multi-Agent-Teams
      • SQL-Agent
      • 10-LangGraph-Research-Assistant
      • LangGraph Code Assistant
      • Deploy on LangGraph Cloud
      • Tree of Thoughts (ToT)
      • Ollama Deep Researcher (Deepseek-R1)
      • Functional API
      • Reflection in LangGraph
  • 19-Cookbook
    • 01-SQL
      • TextToSQL
      • SpeechToSQL
    • 02-RecommendationSystem
      • ResumeRecommendationReview
    • 03-GraphDB
      • Movie QA System with Graph Database
      • 05-TitanicQASystem
      • Real-Time GraphRAG QA
    • 04-GraphRAG
      • Academic Search System
      • Academic QA System with GraphRAG
    • 05-AIMemoryManagementSystem
      • ConversationMemoryManagementSystem
    • 06-Multimodal
      • Multimodal RAG
      • Shopping QnA
    • 07-Agent
      • 14-MoARAG
      • CoT Based Smart Web Search
      • 16-MultiAgentShoppingMallSystem
      • Agent-Based Dynamic Slot Filling
      • Code Debugging System
      • New Employee Onboarding Chatbot
      • 20-LangGraphStudio-MultiAgent
      • Multi-Agent Scheduler System
    • 08-Serving
      • FastAPI Serving
      • Sending Requests to Remote Graph Server
      • Building a Agent API with LangServe: Integrating Currency Exchange and Trip Planning
    • 08-SyntheticDataset
      • Synthetic Dataset Generation using RAG
    • 09-Monitoring
      • Langfuse Selfhosting
Powered by GitBook
On this page
  • Overview
  • Table of Contents
  • References
  • Environment Setup
  • Introduction to Streaming Modes
  • Defining the Graph
  • Step-by-step output of a node
  • Values Mode (stream_mode="values")
  • Updates Mode (stream_mode="updates")
  • Messages Mode (stream_mode="messages")
  • Advanced Streaming Features
  • Streaming output to a specific node
  • Filtering with Tags
  • Tool Call Streaming
  • Working with Subgraphs
  • Example Flow
  • When including Subgraphs output
  • Streaming LLM Output Token by Token Inside Subgraphs
  • For streaming output of only specific tags
  1. 17-LangGraph
  2. 01-Core-Features

LangGraph Streaming Mode

PreviousHow to transform the input and output of a subgraphNextErrors

Last updated 28 days ago

  • Author:

  • Peer Review:

  • Proofread :

  • This is a part of

Overview

This tutorial demonstrates LangGraph's streaming capabilities by building an AI news search system.

It covers three key streaming modes: values, updates, and messages, each serving different output monitoring needs.

The tutorial also explores advanced features including subgraphs and tag-based filtering for enhanced control over real-time AI outputs.

Table of Contents

References


Environment Setup

[Note]

%%capture --no-stderr
%pip install langchain-opentutorial
# Install required packages
from langchain_opentutorial import package

package.install(
    [
        "langchain",
        "langchain_core",
        "langchain_openai",
        "langgraph"
    ],
    verbose=False,
    upgrade=False,
)

You can set API keys in a .env file or set them manually.

[Note] If you’re not using the .env file, no worries! Just enter the keys directly in the cell below, and you’re good to go.

from dotenv import load_dotenv
from langchain_opentutorial import set_env

# Attempt to load environment variables from a .env file; if unsuccessful, set them manually.
if not load_dotenv():
    set_env(
        {
            "OPENAI_API_KEY": "",
            "LANGCHAIN_API_KEY": "",
            "LANGCHAIN_TRACING_V2": "true",
            "LANGCHAIN_ENDPOINT": "https://api.smith.langchain.com",
            "LANGCHAIN_PROJECT": "LangGraph Streaming Mode",
        }
    )

Introduction to Streaming Modes

LangGraph supports multiple streaming modes. The main ones are:

  • values: This streaming mode streams back values of the graph. This is the full state of the graph after each node is called.

  • updates: This streaming mode streams back updates to the graph. This is the update to the state of the graph after each node is called.

  • messages: This streaming mode streams LLM tokens from nodes as they are produced.

Defining the Graph

We'll create a simple agent that can search news and process the results.

First, we'll define a class to fetch Google News search results:

import feedparser
from urllib.parse import quote
from typing import List, Dict, Optional


class GoogleNews:
    """
    A class for searching and retrieving Google News results.
    """

    def __init__(self):
        """
        Initialize the GoogleNews class.
        Sets up the base_url attribute.
        """
        self.base_url = "https://news.google.com/rss"

    def _fetch_news(self, url: str, k: int = 3) -> List[Dict[str, str]]:
        """
        Fetch news from the given URL.

        Args:
            url (str): URL to fetch news from
            k (int): Maximum number of news items to fetch (default: 3)

        Returns:
            List[Dict[str, str]]: List of dictionaries containing news titles and links
        """
        news_data = feedparser.parse(url)
        return [
            {"title": entry.title, "link": entry.link}
            for entry in news_data.entries[:k]
        ]

    def _collect_news(self, news_list: List[Dict[str, str]]) -> List[Dict[str, str]]:
        """
        Process and return a list of news items.

        Args:
            news_list (List[Dict[str, str]]): List of dictionaries containing news information

        Returns:
            List[Dict[str, str]]: List of dictionaries containing URLs and content
        """
        if not news_list:
            print("No news found for this keyword.")
            return []

        result = []
        for news in news_list:
            result.append({"url": news["link"], "content": news["title"]})

        return result

    def search_by_keyword(
        self, keyword: Optional[str] = None, k: int = 3
    ) -> List[Dict[str, str]]:
        """
        Search news by keyword.

        Args:
            keyword (Optional[str]): Keyword to search for (default: None)
            k (int): Maximum number of news items to search for (default: 3)

        Returns:
            List[Dict[str, str]]: List of dictionaries containing URLs and content
        """
        if keyword:
            encoded_keyword = quote(keyword)
            url = f"{self.base_url}/search?q={encoded_keyword}&hl=en&gl=US&ceid=US:en"
        else:
            url = f"{self.base_url}?hl=en&gl=US&ceid=US:en"
        news_list = self._fetch_news(url, k)
        return self._collect_news(news_list)

Now that we have our news fetching functionality, let's build the graph structure using LangGraph.

We'll create states, define tools, and establish the connections between different components:

from typing import Annotated, List, Dict
from typing_extensions import TypedDict
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition

########## 1. State Definition ##########
class State(TypedDict):
    messages: Annotated[list, add_messages]

########## 2. Tool Definition and Binding ##########
@tool
def search_keyword(query: str) -> List[Dict[str, str]]:
    """Look up news by keyword"""
    news_tool = GoogleNews()
    return "\n".join(
        [f'- {news["content"]}' for news in news_tool.search_by_keyword(query, k=5)]
    )

tools = [search_keyword]
llm = ChatOpenAI(model="gpt-4o-mini")
llm_with_tools = llm.bind_tools(tools)

########## 3. Add Nodes ##########
def chatbot(state: State):
    return {"messages": [llm_with_tools.invoke(state["messages"])]}

graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
tool_node = ToolNode(tools=[search_keyword])
graph_builder.add_node("tools", tool_node)

# Add conditional edges
graph_builder.add_conditional_edges(
    "chatbot",
    tools_condition,
)

########## 4. Add Edges ##########
# tools > chatbot
graph_builder.add_edge("tools", "chatbot")
# START > chatbot
graph_builder.add_edge(START, "chatbot")
# chatbot > END
graph_builder.add_edge("chatbot", END)

# Compile graph
graph = graph_builder.compile()

Visualize the graph.

from IPython.display import display, Image

display(Image(graph.get_graph().draw_mermaid_png()))

Step-by-step output of a node

Streaming mode

  • values: Output current status value for each step

  • updates: Output only status updates for each step (default)

  • messages: Output messages for each step

Streaming here does not mean token-by-token streaming of LLM output, but rather step-by-step output.

Values Mode (stream_mode="values")

The values mode streams the complete state after each node execution.

A chunk is a tuple with two elements.

  • key: key of State

  • value: value of State

Synchronous Streaming

inputs = {"messages": [("human", "Search for the latest AI news")]}

for chunk in graph.stream(inputs, stream_mode="values"):
    for state_key, state_value in chunk.items():
        if state_key == "messages":
            state_value[-1].pretty_print()
================================ Human Message =================================
    
    Search for the latest AI news
    ================================== Ai Message ==================================
    Tool Calls:
      search_keyword (call_9DHFR6fvFiXZxCDvJ81rHqlP)
     Call ID: call_9DHFR6fvFiXZxCDvJ81rHqlP
      Args:
        query: AI
    ================================= Tool Message =================================
    Name: search_keyword
    
    - Trump announces up to $500 billion in private sector AI infrastructure investment - CBS News
    - Trump Announces $100 Billion A.I. Initiative - The New York Times
    - Oracle shares rise 9% after Trump unveils $500 bln AI plan - Reuters
    - Google Invests Another $1 Billion in AI Developer Anthropic - Bloomberg
    - Trump announces $500 billion investment for A.I. infrastructure project - KTVU FOX 2 San Francisco
    ================================== Ai Message ==================================
    
    Here are the latest news articles related to AI:
    
    1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    3. **Oracle shares rise 9% after Trump unveils $500 billion AI plan** - Reuters
    4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    
    If you need more details or specific information about any of these articles, feel free to ask!

Asynchronous Streaming

The astream() method runs the graph through asynchronous stream processing and generates chunked responses in value mode.

It uses an async for statement to perform asynchronous stream processing.

inputs = {"messages": [("human", "Search for the latest AI news")]}

async for chunk in graph.astream(inputs, stream_mode="values"):
    for state_key, state_value in chunk.items():
        if state_key == "messages":
            state_value[-1].pretty_print()
================================ Human Message =================================
    
    Search for the latest AI news
    ================================== Ai Message ==================================
    Tool Calls:
      search_keyword (call_TCw7EC7kZ7pDeI1zUQNUlWfu)
     Call ID: call_TCw7EC7kZ7pDeI1zUQNUlWfu
      Args:
        query: AI
    ================================= Tool Message =================================
    Name: search_keyword
    
    - Trump announces up to $500 billion in private sector AI infrastructure investment - CBS News
    - Trump Announces $100 Billion A.I. Initiative - The New York Times
    - Oracle shares rise 9% after Trump unveils $500 bln AI plan - Reuters
    - Google Invests Another $1 Billion in AI Developer Anthropic - Bloomberg
    - Trump announces $500 billion investment for A.I. infrastructure project - KTVU FOX 2 San Francisco
    ================================== Ai Message ==================================
    
    Here are the latest news articles related to AI:
    
    1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    3. **Oracle shares rise 9% after Trump unveils $500 bln AI plan** - Reuters
    4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    
    If you would like more details on any of these articles, just let me know!

If you only want to see the final result, do the following:

inputs = {"messages": [("human", "Search for the latest AI news")]}

final_result = None

async for chunk in graph.astream(inputs, stream_mode="values"):
    final_result = chunk

# Display the final result
print(final_result["messages"][-1].content)
Here are the latest news articles related to AI:
    
    1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    3. **Oracle shares rise 9% after Trump unveils $500 bln AI plan** - Reuters
    4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    
    If you would like more details on any of these articles, let me know!

Updates Mode (stream_mode="updates")

The updates mode streams only the changes to the state after each node execution.

The output is a dictionary with the node names as keys and the updated values as values.

A chunk is a tuple with two elements.

  • key: the name of the Node

  • value: The output value from that Node step, i.e. a dictionary with multiple key-value pairs

Synchronous Streaming

inputs = {"messages": [("human", "Search for the latest AI news")]}

for chunk in graph.stream(inputs, stream_mode="updates"):
    for node, value in chunk.items():
        if node:
            print(f"\n[Node: {node}]\n")
        if "messages" in value:
            value["messages"][-1].pretty_print()
    [Node: chatbot]
    
    ================================== Ai Message ==================================
    Tool Calls:
      search_keyword (call_lLAPc2hQKwCvb9EEW0PrUFsU)
     Call ID: call_lLAPc2hQKwCvb9EEW0PrUFsU
      Args:
        query: AI
    
    [Node: tools]
    
    ================================= Tool Message =================================
    Name: search_keyword
    
    - Trump announces up to $500 billion in private sector AI infrastructure investment - CBS News
    - Trump Announces $100 Billion A.I. Initiative - The New York Times
    - Oracle shares rise 9% after Trump unveils $500 bln AI plan - Reuters
    - Google Invests Another $1 Billion in AI Developer Anthropic - Bloomberg
    - Trump announces $500 billion investment for A.I. infrastructure project - KTVU FOX 2 San Francisco
    
    [Node: chatbot]
    
    ================================== Ai Message ==================================
    
    Here are the latest news articles related to AI:
    
    1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    3. **Oracle shares rise 9% after Trump unveils $500 bln AI plan** - Reuters
    4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    
    If you need more details on any of these articles, let me know!

Asynchronous Streaming

inputs = {"messages": [("human", "Search for the latest AI news")]}

async for chunk in graph.astream(inputs, stream_mode="updates"):
    for node, value in chunk.items():
        if node:
            print(f"\n[Node: {node}]\n")
        if "messages" in value:
            value["messages"][-1].pretty_print()
    [Node: chatbot]
    
    ================================== Ai Message ==================================
    Tool Calls:
      search_keyword (call_Jq43xxwMWav17MnpzrIxd9xZ)
     Call ID: call_Jq43xxwMWav17MnpzrIxd9xZ
      Args:
        query: AI
    
    [Node: tools]
    
    ================================= Tool Message =================================
    Name: search_keyword
    
    - Trump announces up to $500 billion in private sector AI infrastructure investment - CBS News
    - Trump Announces $100 Billion A.I. Initiative - The New York Times
    - Oracle shares rise 9% after Trump unveils $500 bln AI plan - Reuters
    - Google Invests Another $1 Billion in AI Developer Anthropic - Bloomberg
    - Trump announces $500 billion investment for A.I. infrastructure project - KTVU FOX 2 San Francisco
    
    [Node: chatbot]
    
    ================================== Ai Message ==================================
    
    Here are some of the latest news articles related to AI:
    
    1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    3. **Oracle shares rise 9% after Trump unveils $500 bln AI plan** - Reuters
    4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    
    If you would like more details about any specific article, let me know!

Messages Mode (stream_mode="messages")

The messages mode streams individual messages from each node.

A chunk is a tuple with two elements.

  • chunk_msg: real-time output message

  • metadata: Node information

Synchronous Streaming

inputs = {"messages": [("human", "Search for the latest AI news")]}

for chunk_msg, metadata in graph.stream(inputs, stream_mode="messages"):
    if metadata["langgraph_node"] == "chatbot":
        if chunk_msg.content:
            print(chunk_msg.content, end="", flush=True)
    else:
        print(chunk_msg.content)
        print(f"\n\nmetadata: \n{metadata}\n\n")
- Trump announces up to $500 billion in private sector AI infrastructure investment - CBS News
    - Trump Announces $100 Billion A.I. Initiative - The New York Times
    - Oracle shares rise 9% after Trump unveils $500 bln AI plan - Reuters
    - Google Invests Another $1 Billion in AI Developer Anthropic - Bloomberg
    - Trump announces $500 billion investment for A.I. infrastructure project - KTVU FOX 2 San Francisco
    
    
    metadata: 
    {'langgraph_step': 2, 'langgraph_node': 'tools', 'langgraph_triggers': ['branch:chatbot:tools_condition:tools'], 'langgraph_path': ('__pregel_pull', 'tools'), 'langgraph_checkpoint_ns': 'tools:81eab6e0-0838-3cdf-6f0f-c63d6ac965a2'}
    
    
    Here are the latest news articles related to AI:
    
    1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    3. **Oracle shares rise 9% after Trump unveils $500 bln AI plan** - Reuters
    4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    
    Let me know if you need more information on any of these articles!

Asynchronous Streaming

inputs = {"messages": [("human", "Search for the latest AI news")]}

async for chunk_msg, metadata in graph.astream(inputs, stream_mode="messages"):
    if metadata["langgraph_node"] == "chatbot":
        if chunk_msg.content:
            print(chunk_msg.content, end="", flush=True)
    else:
        print(chunk_msg.content)
        print(f"\n\nmetadata: \n{metadata}\n\n")
- Trump announces up to $500 billion in private sector AI infrastructure investment - CBS News
    - Trump Announces $100 Billion A.I. Initiative - The New York Times
    - Oracle shares rise 9% after Trump unveils $500 bln AI plan - Reuters
    - Google Invests Another $1 Billion in AI Developer Anthropic - Bloomberg
    - Trump announces $500 billion investment for A.I. infrastructure project - KTVU FOX 2 San Francisco
    
    
    metadata: 
    {'langgraph_step': 2, 'langgraph_node': 'tools', 'langgraph_triggers': ['branch:chatbot:tools_condition:tools'], 'langgraph_path': ('__pregel_pull', 'tools'), 'langgraph_checkpoint_ns': 'tools:036f524f-0b7b-712f-c1c1-8219277f41c2'}
    
    
    Here are the latest news articles related to AI:
    
    1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    3. **Oracle shares rise 9% after Trump unveils $500 bln AI plan** - Reuters
    4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    
    Let me know if you need more details on any of these articles!

Advanced Streaming Features

Streaming output to a specific node

If you want to output for a specific Node, you can set it via stream_mode="messages".

When setting stream_mode="messages", you will receive messages in the form of (chunk_msg, metadata).

  • chunk_msg: the real-time output message

  • metadata: the node information

You can use metadata["langgraph_node"] to output only messages from a specific node.

from langchain_core.messages import HumanMessage

inputs = {"messages": [("human", "Search for the latest AI news")]}

for chunk_msg, metadata in graph.stream(inputs, stream_mode="messages"):
    # Only process and output valid content from final nodes that are not HumanMessage
    if (
        chunk_msg.content
        and not isinstance(chunk_msg, HumanMessage)
        and metadata["langgraph_node"] == "chatbot"
    ):
        print(chunk_msg.content, end="", flush=True)
Here are the latest news articles related to AI:
    
    1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    3. **Oracle shares rise 9% after Trump unveils $500 bln AI plan** - Reuters
    4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    
    If you would like more details on any specific article, let me know!

You can see the node information by outputting metadata.

metadata
{'langgraph_step': 3,
     'langgraph_node': 'chatbot',
     'langgraph_triggers': ['tools'],
     'langgraph_path': ('__pregel_pull', 'chatbot'),
     'langgraph_checkpoint_ns': 'chatbot:7755084b-57f3-4f12-44b2-fad575fb994f',
     'checkpoint_ns': 'chatbot:7755084b-57f3-4f12-44b2-fad575fb994f',
     'ls_provider': 'openai',
     'ls_model_name': 'gpt-4o-mini',
     'ls_model_type': 'chat',
     'ls_temperature': 0.7}
from IPython.display import display, Image

display(Image(graph.get_graph().draw_mermaid_png()))

Filtering with Tags

If your LLM's output comes from multiple places, you may only want to output messages from a specific node.

In this case, you can add tags to select only the nodes you want to output.

Here's how to add tags to your LLM. Tags can be added as a list.

llm_with_tools = llm.bind_tools(tools).with_config(tags=["WANT_TO_STREAM"])

This allows you to filter events more precisely, keeping only events that occurred in that model.

The example below outputs only if the WANT_TO_STREAM tag is present.

async for event in graph.astream_events(inputs, version="v2"):
    kind = event["event"]
    tags = event.get("tags", [])
    
    # Filter chat model stream events and end node tags
    if kind == "on_chat_model_stream" and "WANT_TO_STREAM" in tags:
        data = event["data"]
        
        if data["chunk"].content:
            print(data["chunk"].content, end="", flush=True)
Here are some of the latest news articles related to AI:
    
    1. **US tech giants announce AI plan worth up to $500bn** - BBC.com
    2. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    3. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    4. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    5. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    
    If you would like more details on any specific article, let me know!

Tool Call Streaming

  • AIMessageChunk: Output messages in real-time, tokenized units.

  • tool_call_chunks: Tool call chunks. If tool_call_chunks exists, output tool call chunks cumulatively. (Tool tokens are determined by looking at this property)

from langchain_core.messages import AIMessageChunk, HumanMessage

inputs = {"messages": [("human", "Search for the latest AI news")]}

# set the flag whether to process the first message
first = True

for msg, metadata in graph.stream(inputs, stream_mode="messages"):
    # Handle content output when it's not a user message
    if msg.content and not isinstance(msg, HumanMessage):
        print(msg.content, end="", flush=True)

    if isinstance(msg, AIMessageChunk):
        if first:
            gathered = msg
            first = False
        else:
            gathered = gathered + msg

        # Print accumulated tool call information when a tool call chunk exists
        if msg.tool_call_chunks:
            print(gathered.tool_calls[0]["args"])
{}
    {}
    {}
    {'query': ''}
    {'query': 'AI'}
    {'query': 'AI'}
    - Trump announces up to $500 billion in private sector AI infrastructure investment - CBS News
    - Trump Announces $100 Billion A.I. Initiative - The New York Times
    - Oracle shares rise 9% after Trump unveils $500 bln AI plan - Reuters
    - Google Invests Another $1 Billion in AI Developer Anthropic - Bloomberg
    - Trump announces $500 billion investment for A.I. infrastructure project - KTVU FOX 2 San FranciscoHere are the latest news articles related to AI:
    
    1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    3. **Oracle shares rise 9% after Trump unveils $500 billion AI plan** - Reuters
    4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    
    Let me know if you need more information on any of these articles!

Working with Subgraphs

In this part, we'll learn how to structure your graph using Subgraphs.

Subgraphs is a feature that allows you to define parts of a graph as subgraphs.

Example Flow

  1. Subgraphs reuse the existing ability to search for the latest news.

  2. The Parent Graph adds the ability to generate social media posts based on the latest news retrieved.

from typing import Annotated, List, Dict
from typing_extensions import TypedDict
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition


########## 1. State Definition ##########
class State(TypedDict):
    # Add message list annotation
    messages: Annotated[list, add_messages]


########## 2. Tool Definition and Binding ##########
@tool
def search_keyword(query: str) -> List[Dict[str, str]]:
    """Look up news by keyword"""
    news_tool = GoogleNews()
    return "\n".join([f'- {news["content"]}' for news in news_tool.search_by_keyword(query, k=5)])


# Create tool list
tools = [search_keyword]

# Initialize LLM
llm = ChatOpenAI(model="gpt-4o-mini")

# Combine tools with LLM (add tags)
llm_with_tools = llm.bind_tools(tools).with_config(tags=["WANT_TO_STREAM"])

########## 3. Add Nodes ##########
def chatbot(state: State):
    return {"messages": [llm_with_tools.invoke(state["messages"])]}

def create_sns_post(state: State):
    sns_prompt = """
Please convert the previous conversation into a social media post format.
Follow these guidelines:
- Include hashtags
- Use emojis
- Use concise and engaging writing style
- Keep it within 200 characters
"""
    messages = state["messages"] + [("human", sns_prompt)]
    sns_llm = ChatOpenAI(model="gpt-4o-mini").with_config(tags=["WANT_TO_STREAM2"])
    return {"messages": [sns_llm.invoke(messages)]}


def create_subgraph():
     # Create state graph for subgraph
    subgraph = StateGraph(State)

    subgraph.add_node("chatbot", chatbot)

    # Create and add tool node
    tool_node = ToolNode(tools=[search_keyword])
    subgraph.add_node("tools", tool_node)

    subgraph.add_conditional_edges(
        "chatbot",
        tools_condition,
    )

    # tools > chatbot
    subgraph.add_edge("tools", "chatbot")

    # START > chatbot
    subgraph.add_edge(START, "chatbot")

    # chatbot > END 
    subgraph.add_edge("chatbot", END)

    return subgraph.compile()

# Create main graph
graph_builder = StateGraph(State)

# Add subgraph
subgraph = create_subgraph()
graph_builder.add_node("news_subgraph", subgraph)

# Add SNS post creation node
graph_builder.add_node("sns_post", create_sns_post)

# START > news_subgraph
graph_builder.add_edge(START, "news_subgraph")

# news_subgraph > sns_post
graph_builder.add_edge("news_subgraph", "sns_post")

# sns_post > END
graph_builder.add_edge("sns_post", END)

# Compile graph
graph = graph_builder.compile()

We can visualize the subgraph flow using xray option.

from IPython.display import display, Image

display(Image(graph.get_graph(xray=True).draw_mermaid_png()))

Now let's see how our graph processes and outputs the results when searching for AI news.

inputs = {"messages": [("human", "Search for the latest AI news")]}

# Process and output node updates sequentially
for chunk in graph.stream(inputs, stream_mode="updates"):
      # node_name: current processing node name
      # node_chunk: chunk data for that node
    for node_name, node_chunk in chunk.items():
        print(f"\n========= Update from node {node_name} =========\n")
        # Output updated data for the node
        if "messages" in node_chunk:
            node_chunk["messages"][-1].pretty_print()
        else:
            print(node_chunk)
    ========= Update from node news_subgraph =========
    
    ================================== Ai Message ==================================
    
    Here are the latest news articles related to AI:
    
    1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    3. **Oracle shares rise 9% after Trump unveils $500 bln AI plan** - Reuters
    4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    
    If you would like more details about any of these articles, let me know!
    
    ========= Update from node sns_post =========
    
    ================================== Ai Message ==================================
    
    🚀 Big news in AI! Trump just announced a massive $500 billion investment for AI infrastructure! 💰 Plus, Google drops another $1B on Anthropic! Exciting times ahead! 🤖✨ #AI #Innovation #TechNews

When including Subgraphs output

You can also include the output of Subgraphs via subgraphs=True.

The output will be in the form (namespace, chunk).

inputs = {"messages": [("human", "Search for the latest AI news")]}

# Formatting function to convert namespace string to a readable format
def format_namespace(namespace):
    return (
        namespace[-1].split(":")[0]
        if len(namespace) > 0
        else "parent graph"
    )

# Using subgraphs=True to include subgraph outputs in (namespace, chunk) format
for namespace, chunk in graph.stream(inputs, stream_mode="updates", subgraphs=True):
    # node_name: current processing node name
    # node_chunk: chunk data for that node
    for node_name, node_chunk in chunk.items():
        print(f"\n========= Update from node [{node_name}] in [{format_namespace(namespace)}] =========\n")

        # Output node's chunk data
        if "messages" in node_chunk:
            node_chunk["messages"][-1].pretty_print()
        else:
            print(node_chunk)
    ========= Update from node [chatbot] in [news_subgraph] =========
    
    ================================== Ai Message ==================================
    Tool Calls:
      search_keyword (call_oydQcSskSQeUUPFeh9iCATpT)
     Call ID: call_oydQcSskSQeUUPFeh9iCATpT
      Args:
        query: AI
    
    ========= Update from node [tools] in [news_subgraph] =========
    
    ================================= Tool Message =================================
    Name: search_keyword
    
    - Trump announces up to $500 billion in private sector AI infrastructure investment - CBS News
    - Trump Announces $100 Billion A.I. Initiative - The New York Times
    - Oracle shares rise 9% after Trump unveils $500 bln AI plan - Reuters
    - Google Invests Another $1 Billion in AI Developer Anthropic - Bloomberg
    - Trump announces $500 billion investment for A.I. infrastructure project - KTVU FOX 2 San Francisco
    
    ========= Update from node [chatbot] in [news_subgraph] =========
    
    ================================== Ai Message ==================================
    
    Here are the latest news articles related to AI:
    
    1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    3. **Oracle shares rise 9% after Trump unveils $500 bln AI plan** - Reuters
    4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    
    Let me know if you need more information on any of these articles!
    
    ========= Update from node [news_subgraph] in [parent graph] =========
    
    ================================== Ai Message ==================================
    
    Here are the latest news articles related to AI:
    
    1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    3. **Oracle shares rise 9% after Trump unveils $500 bln AI plan** - Reuters
    4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    
    Let me know if you need more information on any of these articles!
    
    ========= Update from node [sns_post] in [parent graph] =========
    
    ================================== Ai Message ==================================
    
    🚀 Big news in AI! Trump unveils a massive $500B investment in AI infrastructure! 💰 Plus, Google drops another $1B into Anthropic! 🤖✨ #AI #Investment #TechNews #Innovation

Streaming LLM Output Token by Token Inside Subgraphs

kind indicates the type of event.

# Function to parse namespace information
def parse_namespace_info(info: tuple) -> tuple[str, str]:
    if len(info) > 1:
        namespace, node_name = info
        return node_name.split(":")[0], namespace.split(":")[0]
    return info[0].split(":")[0], "parent graph"

kind = None

async for event in graph.astream_events(inputs, version="v2", subgraphs=True):
    kind = event["event"]

    # Extract event type and tag information
    if kind == "on_chat_model_start":
        print(f"\n========= on_chat_model_start =========\n")

    # Filter chat model stream events and final node tags
    elif kind == "on_chat_model_stream":
        data = event["data"]

        # Stream output token by token
        if data["chunk"].content:
            print(data["chunk"].content, end="", flush=True)

    elif kind == "on_tool_start":
        print(f"\n========= tool_start =========\n")
        data = event["data"]
        if "input" in data:
            tool_msg = data["input"]
            print(tool_msg)            

    elif kind == "on_tool_end":
        print(f"\n========= tool_end =========\n")
        data = event["data"]
        if "output" in data:
            tool_msg = data["output"]
            print(tool_msg.content)
    ========= on_chat_model_start =========
    
    
    ========= tool_start =========
    
    {'query': 'AI'}
    
    ========= tool_end =========
    
    - Trump announces up to $500 billion in private sector AI infrastructure investment - CBS News
    - Trump Announces $100 Billion A.I. Initiative - The New York Times
    - Oracle shares rise 9% after Trump unveils $500 bln AI plan - Reuters
    - Google Invests Another $1 Billion in AI Developer Anthropic - Bloomberg
    - Trump announces $500 billion investment for A.I. infrastructure project - KTVU FOX 2 San Francisco
    
    ========= on_chat_model_start =========
    
    Here are the latest news articles related to AI:
    
    1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
    2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
    3. **Oracle shares rise 9% after Trump unveils $500 bln AI plan** - Reuters
    4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
    5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco
    
    If you would like more information on any specific article, let me know!
    ========= on_chat_model_start =========
    
    🚀 Big moves in AI! Trump unveils a massive $500B investment for AI infrastructure! 💰 Plus, Google boosts Anthropic with another $1B! Exciting times ahead! #AI #Investment #TechNews #Innovation

For streaming output of only specific tags

ONLY_STREAM_TAGS allows you to set only the tags you want to stream output.

Here we see that "WANT_TO_STREAM2" is excluded from the output and only "WANT_TO_STREAM" is output.

# Function to parse namespace information
def parse_namespace_info(info: tuple) -> tuple[str, str]:
    if len(info) > 1:
        namespace, node_name = info
        return node_name.split(":")[0], namespace.split(":")[0]
    return info[0].split(":")[0], "parent graph"

# Set tags for streaming output
ONLY_STREAM_TAGS = ["WANT_TO_STREAM"]

kind = None
tags = None

async for event in graph.astream_events(inputs, version="v2", subgraphs=True):
    kind = event["event"]
    tags = event.get("tags", [])

    # Extract event type and tag information
    if kind == "on_chat_model_start":
        print(f"\n========= tags: {tags} =========\n")

    # Filter chat model stream events and final node tags
    elif kind == "on_chat_model_stream":
        for tag in tags:
            if tag in ONLY_STREAM_TAGS:
                data = event["data"]

                # Output message
                if data["chunk"].content:
                    print(data["chunk"].content, end="", flush=True)
    elif kind == "on_tool_start":
        print(f"\n========= tool_start =========\n")
        data = event["data"]
        if "input" in data:
            tool_msg = data["input"]
            print(tool_msg)            

    elif kind == "on_tool_end":
        print(f"\n========= tool_end =========\n")
        data = event["data"]
        if "output" in data:
            tool_msg = data["output"]
            print(tool_msg.content)
    ========= tags: ['seq:step:1', 'WANT_TO_STREAM'] =========
    
========= tool_start =========

{'query': 'AI'}

========= tool_end =========

- Trump announces up to $500 billion in private sector AI infrastructure investment - CBS News
- Trump Announces $100 Billion A.I. Initiative - The New York Times
- Oracle shares rise 9% after Trump unveils $500 bln AI plan - Reuters
- Google Invests Another $1 Billion in AI Developer Anthropic - Bloomberg
- Trump announces $500 billion investment for A.I. infrastructure project - KTVU FOX 2 San Francisco

========= tags: ['seq:step:1', 'WANT_TO_STREAM'] =========

Here are the latest news articles related to AI:

1. **Trump announces up to $500 billion in private sector AI infrastructure investment** - CBS News
2. **Trump Announces $100 Billion A.I. Initiative** - The New York Times
3. **Oracle shares rise 9% after Trump unveils $500 bln AI plan** - Reuters
4. **Google Invests Another $1 Billion in AI Developer Anthropic** - Bloomberg
5. **Trump announces $500 billion investment for A.I. infrastructure project** - KTVU FOX 2 San Francisco

Let me know if you would like more information on any of these articles!
========= tags: ['seq:step:1', 'WANT_TO_STREAM2'] =========

Setting up your environment is the first step. See the guide for more details.

The langchain-opentutorial is a package of easy-to-use environment setup guidance, useful functions and utilities for tutorials. Check out the for more details.

See the for all event types.

LangGraph: Streaming
LangGraph: How to stream events
LangChain: astream_events()
Environment Setup
langchain-opentutorial
LangChain astream_events() reference
Yejin Park
Chaeyoon Kim
LangChain Open Tutorial
Overview
Environment Setup
Introduction to Streaming Modes
Defining the Graph
Step-by-step output of a node
Advanced Streaming Features
Working with Subgraphs
png
png
png