LangChain OpenTutorial
  • 🦜️🔗 The LangChain Open Tutorial for Everyone
  • 01-Basic
    • Getting Started on Windows
    • 02-Getting-Started-Mac
    • OpenAI API Key Generation and Testing Guide
    • LangSmith Tracking Setup
    • Using the OpenAI API (GPT-4o Multimodal)
    • Basic Example: Prompt+Model+OutputParser
    • LCEL Interface
    • Runnable
  • 02-Prompt
    • Prompt Template
    • Few-Shot Templates
    • LangChain Hub
    • Personal Prompts for LangChain
    • Prompt Caching
  • 03-OutputParser
    • PydanticOutputParser
    • PydanticOutputParser
    • CommaSeparatedListOutputParser
    • Structured Output Parser
    • JsonOutputParser
    • PandasDataFrameOutputParser
    • DatetimeOutputParser
    • EnumOutputParser
    • Output Fixing Parser
  • 04-Model
    • Using Various LLM Models
    • Chat Models
    • Caching
    • Caching VLLM
    • Model Serialization
    • Check Token Usage
    • Google Generative AI
    • Huggingface Endpoints
    • HuggingFace Local
    • HuggingFace Pipeline
    • ChatOllama
    • GPT4ALL
    • Video Q&A LLM (Gemini)
  • 05-Memory
    • ConversationBufferMemory
    • ConversationBufferWindowMemory
    • ConversationTokenBufferMemory
    • ConversationEntityMemory
    • ConversationKGMemory
    • ConversationSummaryMemory
    • VectorStoreRetrieverMemory
    • LCEL (Remembering Conversation History): Adding Memory
    • Memory Using SQLite
    • Conversation With History
  • 06-DocumentLoader
    • Document & Document Loader
    • PDF Loader
    • WebBaseLoader
    • CSV Loader
    • Excel File Loading in LangChain
    • Microsoft Word(doc, docx) With Langchain
    • Microsoft PowerPoint
    • TXT Loader
    • JSON
    • Arxiv Loader
    • UpstageDocumentParseLoader
    • LlamaParse
    • HWP (Hangeul) Loader
  • 07-TextSplitter
    • Character Text Splitter
    • 02. RecursiveCharacterTextSplitter
    • Text Splitting Methods in NLP
    • TokenTextSplitter
    • SemanticChunker
    • Split code with Langchain
    • MarkdownHeaderTextSplitter
    • HTMLHeaderTextSplitter
    • RecursiveJsonSplitter
  • 08-Embedding
    • OpenAI Embeddings
    • CacheBackedEmbeddings
    • HuggingFace Embeddings
    • Upstage
    • Ollama Embeddings With Langchain
    • LlamaCpp Embeddings With Langchain
    • GPT4ALL
    • Multimodal Embeddings With Langchain
  • 09-VectorStore
    • Vector Stores
    • Chroma
    • Faiss
    • Pinecone
    • Qdrant
    • Elasticsearch
    • MongoDB Atlas
    • PGVector
    • Neo4j
    • Weaviate
    • Faiss
    • {VectorStore Name}
  • 10-Retriever
    • VectorStore-backed Retriever
    • Contextual Compression Retriever
    • Ensemble Retriever
    • Long Context Reorder
    • Parent Document Retriever
    • MultiQueryRetriever
    • MultiVectorRetriever
    • Self-querying
    • TimeWeightedVectorStoreRetriever
    • TimeWeightedVectorStoreRetriever
    • Kiwi BM25 Retriever
    • Ensemble Retriever with Convex Combination (CC)
  • 11-Reranker
    • Cross Encoder Reranker
    • JinaReranker
    • FlashRank Reranker
  • 12-RAG
    • Understanding the basic structure of RAG
    • RAG Basic WebBaseLoader
    • Exploring RAG in LangChain
    • RAPTOR: Recursive Abstractive Processing for Tree-Organized Retrieval
    • Conversation-With-History
    • Translation
    • Multi Modal RAG
  • 13-LangChain-Expression-Language
    • RunnablePassthrough
    • Inspect Runnables
    • RunnableLambda
    • Routing
    • Runnable Parallel
    • Configure-Runtime-Chain-Components
    • Creating Runnable objects with chain decorator
    • RunnableWithMessageHistory
    • Generator
    • Binding
    • Fallbacks
    • RunnableRetry
    • WithListeners
    • How to stream runnables
  • 14-Chains
    • Summarization
    • SQL
    • Structured Output Chain
    • StructuredDataChat
  • 15-Agent
    • Tools
    • Bind Tools
    • Tool Calling Agent
    • Tool Calling Agent with More LLM Models
    • Iteration-human-in-the-loop
    • Agentic RAG
    • CSV/Excel Analysis Agent
    • Agent-with-Toolkits-File-Management
    • Make Report Using RAG, Web searching, Image generation Agent
    • TwoAgentDebateWithTools
    • React Agent
  • 16-Evaluations
    • Generate synthetic test dataset (with RAGAS)
    • Evaluation using RAGAS
    • HF-Upload
    • LangSmith-Dataset
    • LLM-as-Judge
    • Embedding-based Evaluator(embedding_distance)
    • LangSmith Custom LLM Evaluation
    • Heuristic Evaluation
    • Compare experiment evaluations
    • Summary Evaluators
    • Groundedness Evaluation
    • Pairwise Evaluation
    • LangSmith Repeat Evaluation
    • LangSmith Online Evaluation
    • LangFuse Online Evaluation
  • 17-LangGraph
    • 01-Core-Features
      • Understanding Common Python Syntax Used in LangGraph
      • Title
      • Building a Basic Chatbot with LangGraph
      • Building an Agent with LangGraph
      • Agent with Memory
      • LangGraph Streaming Outputs
      • Human-in-the-loop
      • LangGraph Manual State Update
      • Asking Humans for Help: Customizing State in LangGraph
      • DeleteMessages
      • DeleteMessages
      • LangGraph ToolNode
      • LangGraph ToolNode
      • Branch Creation for Parallel Node Execution
      • Conversation Summaries with LangGraph
      • Conversation Summaries with LangGraph
      • LangGrpah Subgraph
      • How to transform the input and output of a subgraph
      • LangGraph Streaming Mode
      • Errors
      • A Long-Term Memory Agent
    • 02-Structures
      • LangGraph-Building-Graphs
      • Naive RAG
      • Add Groundedness Check
      • Adding a Web Search Module
      • LangGraph-Add-Query-Rewrite
      • Agentic RAG
      • Adaptive RAG
      • Multi-Agent Structures (1)
      • Multi Agent Structures (2)
    • 03-Use-Cases
      • LangGraph Agent Simulation
      • Meta Prompt Generator based on User Requirements
      • CRAG: Corrective RAG
      • Plan-and-Execute
      • Multi Agent Collaboration Network
      • Multi Agent Collaboration Network
      • Multi-Agent Supervisor
      • 08-LangGraph-Hierarchical-Multi-Agent-Teams
      • 08-LangGraph-Hierarchical-Multi-Agent-Teams
      • SQL-Agent
      • 10-LangGraph-Research-Assistant
      • LangGraph Code Assistant
      • Deploy on LangGraph Cloud
      • Tree of Thoughts (ToT)
      • Ollama Deep Researcher (Deepseek-R1)
      • Functional API
      • Reflection in LangGraph
  • 19-Cookbook
    • 01-SQL
      • TextToSQL
      • SpeechToSQL
    • 02-RecommendationSystem
      • ResumeRecommendationReview
    • 03-GraphDB
      • Movie QA System with Graph Database
      • 05-TitanicQASystem
      • Real-Time GraphRAG QA
    • 04-GraphRAG
      • Academic Search System
      • Academic QA System with GraphRAG
    • 05-AIMemoryManagementSystem
      • ConversationMemoryManagementSystem
    • 06-Multimodal
      • Multimodal RAG
      • Shopping QnA
    • 07-Agent
      • 14-MoARAG
      • CoT Based Smart Web Search
      • 16-MultiAgentShoppingMallSystem
      • Agent-Based Dynamic Slot Filling
      • Code Debugging System
      • New Employee Onboarding Chatbot
      • 20-LangGraphStudio-MultiAgent
      • Multi-Agent Scheduler System
    • 08-Serving
      • FastAPI Serving
      • Sending Requests to Remote Graph Server
      • Building a Agent API with LangServe: Integrating Currency Exchange and Trip Planning
    • 08-SyntheticDataset
      • Synthetic Dataset Generation using RAG
    • 09-Monitoring
      • Langfuse Selfhosting
Powered by GitBook
On this page
  • Overview
  • Table of Contents
  • References
  • Environment Setup
  • Implementing a Comma-Separated List Parser with a Custom Generator
  • Synchronous Parsing
  • Asynchronous Parsing
  • Using RunnableGenerator with Our Comma-Separated List Parser
  • Advantages of RunnableGenerator
  • Transforming the Same Parser Logic
  1. 13-LangChain-Expression-Language

Generator

PreviousRunnableWithMessageHistoryNextBinding

Last updated 28 days ago

  • Author:

  • Design:

  • Peer Review:

  • Proofread :

  • This is a part of

Overview

This tutorial demonstrates how to use a user-defined generator (or asynchronous generator) within a LangChain pipeline to process text outputs in a streaming manner. Specifically, we’ll show how to parse a comma-separated string output into a Python list, leveraging the benefits of streaming from a language model. We will also cover asynchronous usage, showing how to adopt the same approach with async generators.

By the end of this tutorial, you’ll be able to:

  • Implement a custom generator function that can handle streaming outputs.

  • Parse comma-separated text chunks into a list in real time.

  • Use both synchronous and asynchronous approaches for streaming data.

  • Integrate these parsers into a LangChain chain.

  • Optionally, explore how RunnableGenerator can be used to implement custom generator transformations within a streaming context

Table of Contents

References


Environment Setup

[Note]

  • The langchain-opentutorial is a package of easy-to-use environment setup guidance, useful functions and utilities for tutorials.

%%capture --no-stderr
%pip install langchain-opentutorial
# Install required packages
from langchain_opentutorial import package

package.install(
    [
        "langsmith",
        "langchain",
        "langchain_openai",
        "langchain_core",
        "langchain_community",
    ],
    verbose=False,
    upgrade=False,
)
# Set environment variables
from langchain_opentutorial import set_env

set_env(
    {
        "OPENAI_API_KEY": "",
        "LANGCHAIN_API_KEY": "",
        "LANGCHAIN_TRACING_V2": "true",
        "LANGCHAIN_ENDPOINT": "https://api.smith.langchain.com",
        "LANGCHAIN_PROJECT": "09-Generator",
    }
)
Environment variables have been set successfully.

Alternatively, you can set and load OPENAI_API_KEY from a .env file.

[Note] This is only necessary if you haven't already set OPENAI_API_KEY in previous steps.

from dotenv import load_dotenv

load_dotenv(override=True)
True

Implementing a Comma-Separated List Parser with a Custom Generator

When working with language models, you might receive outputs as plain text, such as comma-separated strings. To parse these into a structured format (e.g., a list) as they are generated, you can implement a custom generator function. This retains the streaming benefits — observing partial outputs in real time — while transforming the data into a more usable format.

Synchronous Parsing

In this section, we define a custom generator function called split_into_list(). For each incoming chunk of tokens (strings), it builds up a string by aggregating characters until a comma is encountered within that chunk. At each comma, it yields the current text (stripped and split) as a list item.

from typing import Iterator, List


# A user-defined parser that splits a stream of tokens into a comma-separated list
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
    buffer = ""
    for chunk in input:
        # Accumulate tokens in the buffer
        buffer += chunk
        # Whenever we find a comma, split and yield the segment
        while "," in buffer:
            comma_index = buffer.index(",")
            yield [buffer[:comma_index].strip()]
            buffer = buffer[comma_index + 1 :]
    # Finally, yield whatever remains in the buffer
    yield [buffer.strip()]

We then construct a LangChain pipeline that:

  • Defines a prompt template for comma-separated outputs.

  • Uses ChatOpenAI with temperature=0.0 for deterministic responses.

  • Converts the raw output to a string using StrOutputParser.

  • Pipes ( | ) the string output into split_into_list() for parsing.

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_template(
    "Write a comma-separated list of 5 companies similar to: {company}"
)

# Initialize the model with temperature=0.0 for deterministic output
model = ChatOpenAI(temperature=0.0, model="gpt-4o")

# Chain 1: Convert to a string
str_chain = prompt | model | StrOutputParser()

# Chain 2: Parse the comma-separated string into a list using our generator
list_chain = str_chain | split_into_list

By streaming the output through list_chain, you can observe the partial results in real time. Each list item appears as soon as the parser encounters a comma in the stream.

# Stream the parsed data
for chunk in list_chain.stream({"company": "Google"}):
    print(chunk, flush=True)
['Microsoft']
    ['Apple']
    ['Amazon']
    ['Facebook']
    ['IBM']

If you need the entire parsed list at once (after the entire generation process is completed), you can use the .invoke() method instead of streaming.

output = list_chain.invoke({"company": "Google"})
print(output)
['Microsoft', 'Apple', 'Amazon', 'Facebook', 'IBM']

Asynchronous Parsing

The method described above works for synchronous iteration. However, some applications may require asynchronous operations to prevent blocking the main thread. The following section shows how to achieve the same comma-separated parsing using an async generator.

The asplit_into_list() works similarly to its synchronous counterpart, aggregating tokens until a comma is encountered. However, it uses the async for construct to handle asynchronous data streams.

from typing import AsyncIterator


async def asplit_into_list(input: AsyncIterator[str]) -> AsyncIterator[List[str]]:
    buffer = ""
    async for chunk in input:
        buffer += chunk
        while "," in buffer:
            comma_index = buffer.index(",")
            yield [buffer[:comma_index].strip()]
            buffer = buffer[comma_index + 1 :]
    yield [buffer.strip()]

Then, you can pipe the asynchronous parser into a chain like the synchronous version.

alist_chain = str_chain | asplit_into_list

When you call astream(), you can process each incoming data chunk as it becomes available within an asynchronous context.

async for chunk in alist_chain.astream({"company": "Google"}):
    print(chunk, flush=True)
['Microsoft']
    ['Apple']
    ['Amazon']
    ['Facebook']
    ['IBM']

Similarly, you can get the entire parsed list, using the asynchronous ainvoke() method.

result = await alist_chain.ainvoke({"company": "Google"})
print(result)
['Microsoft', 'Apple', 'Amazon', 'Facebook', 'IBM']

Using RunnableGenerator with Our Comma-Separated List Parser

In addition to implementing your own generator functions directly, LangChain offers the RunnableGenerator class for more advanced or modular streaming behavior. This approach wraps your generator logic in a Runnable, easily pluggin it into a chain while preserving partial output streaming. Below, we modify our comma-separated list parser to demonstrate how RunnableGenerator can be applied.

Advantages of RunnableGenerator

  • Modularity: Easily encapsulate your parsing logic as a Runnable component.

  • Consistency: The RunnableGenerator interface (invoke, stream, ainvoke, astream) is consistent with other LangChain Runnables.

  • Extendability: Combine multiple Runnables (e.g., RunnableLambda, RunnableGenerator) in sequence for more complex transformations.

Transforming the Same Parser Logic

Previously, we defined split_into_list() as a standalone Python generator function. Now, let’s create an equivalent transform function, specifically designed for use with RunnableGenerator. Our goal remains the same: we want to parse a streaming sequence of tokens into a list of individual items upon encountering a comma.

from langchain_core.runnables import RunnableGenerator
from typing import Iterator, List


def comma_parser_runnable(input_iter: Iterator[str]) -> Iterator[List[str]]:
    """
    This function accumulates tokens from input_iter and yields
    each chunk split by commas as a list.
    """
    buffer = ""
    for chunk in input_iter:
        buffer += chunk
        # Whenever we find a comma, split and yield
        while "," in buffer:
            comma_index = buffer.index(",")
            yield [buffer[:comma_index].strip()]
            buffer = buffer[comma_index + 1 :]
    # Finally, yield whatever remains
    yield [buffer.strip()]


# Wrap it in a RunnableGenerator
parser_runnable = RunnableGenerator(comma_parser_runnable)

We can now integrate parser_runnable into the same prompt-and-model pipeline we used before.

list_chain_via_runnable = str_chain | parser_runnable

When run, partial outputs will appear as single-element lists, like our original custom generator approach.

The difference is that we’re now using RunnableGenerator to encapsulate the logic in a more modular, LangChain-native way.

# Stream partial results
for parsed_chunk in list_chain_via_runnable.stream({"company": "Google"}):
    print(parsed_chunk)
['Microsoft']
    ['Apple']
    ['Amazon']
    ['Facebook']
    ['IBM']

Setting up your environment is the first step. See the guide for more details.

Check out the for more details.

LangChain ChatOpenAI API reference
LangChain custom functions
LangChain RunnableGenerator
Python Generators Documentation
Python Async IO Documentation
Environment Setup
langchain-opentutorial
Junseong Kim
Junseong Kim
Chaeyoon Kim
LangChain Open Tutorial
Overview
Environment Setup
Implementing a Comma-Separated List Parser with a Custom Generator
Synchronous Parsing
Asynchronous Parsing
Using RunnableGenerator with Our Comma-Separated List Parser