Generator
Author: Junseong Kim
Design: Junseong Kim
Peer Review:
Proofread : Chaeyoon Kim
This is a part of LangChain Open Tutorial
Overview
This tutorial demonstrates how to use a user-defined generator (or asynchronous generator) within a LangChain pipeline to process text outputs in a streaming manner. Specifically, we’ll show how to parse a comma-separated string output into a Python list, leveraging the benefits of streaming from a language model. We will also cover asynchronous usage, showing how to adopt the same approach with async generators.
By the end of this tutorial, you’ll be able to:
Implement a custom generator function that can handle streaming outputs.
Parse comma-separated text chunks into a list in real time.
Use both synchronous and asynchronous approaches for streaming data.
Integrate these parsers into a LangChain chain.
Optionally, explore how
RunnableGenerator
can be used to implement custom generator transformations within a streaming context
Table of Contents
References
Environment Setup
Setting up your environment is the first step. See the Environment Setup guide for more details.
[Note]
The
langchain-opentutorial
is a package of easy-to-use environment setup guidance, useful functions and utilities for tutorials.Check out the
langchain-opentutorial
for more details.
%%capture --no-stderr
%pip install langchain-opentutorial
# Install required packages
from langchain_opentutorial import package
package.install(
[
"langsmith",
"langchain",
"langchain_openai",
"langchain_core",
"langchain_community",
],
verbose=False,
upgrade=False,
)
# Set environment variables
from langchain_opentutorial import set_env
set_env(
{
"OPENAI_API_KEY": "",
"LANGCHAIN_API_KEY": "",
"LANGCHAIN_TRACING_V2": "true",
"LANGCHAIN_ENDPOINT": "https://api.smith.langchain.com",
"LANGCHAIN_PROJECT": "09-Generator",
}
)
Environment variables have been set successfully.
Alternatively, you can set and load OPENAI_API_KEY
from a .env
file.
[Note] This is only necessary if you haven't already set OPENAI_API_KEY
in previous steps.
from dotenv import load_dotenv
load_dotenv(override=True)
True
Implementing a Comma-Separated List Parser with a Custom Generator
When working with language models, you might receive outputs as plain text, such as comma-separated strings. To parse these into a structured format (e.g., a list) as they are generated, you can implement a custom generator function. This retains the streaming benefits — observing partial outputs in real time — while transforming the data into a more usable format.
Synchronous Parsing
In this section, we define a custom generator function called split_into_list()
. For each incoming chunk of tokens (strings), it builds up a string by aggregating characters until a comma is encountered within that chunk. At each comma, it yields the current text (stripped and split) as a list item.
from typing import Iterator, List
# A user-defined parser that splits a stream of tokens into a comma-separated list
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
buffer = ""
for chunk in input:
# Accumulate tokens in the buffer
buffer += chunk
# Whenever we find a comma, split and yield the segment
while "," in buffer:
comma_index = buffer.index(",")
yield [buffer[:comma_index].strip()]
buffer = buffer[comma_index + 1 :]
# Finally, yield whatever remains in the buffer
yield [buffer.strip()]
We then construct a LangChain pipeline that:
Defines a prompt template for comma-separated outputs.
Uses
ChatOpenAI
withtemperature=0.0
for deterministic responses.Converts the raw output to a string using
StrOutputParser
.Pipes ( | ) the string output into
split_into_list()
for parsing.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
prompt = ChatPromptTemplate.from_template(
"Write a comma-separated list of 5 companies similar to: {company}"
)
# Initialize the model with temperature=0.0 for deterministic output
model = ChatOpenAI(temperature=0.0, model="gpt-4o")
# Chain 1: Convert to a string
str_chain = prompt | model | StrOutputParser()
# Chain 2: Parse the comma-separated string into a list using our generator
list_chain = str_chain | split_into_list
By streaming the output through list_chain
, you can observe the partial results in real time. Each list item appears as soon as the parser encounters a comma in the stream.
# Stream the parsed data
for chunk in list_chain.stream({"company": "Google"}):
print(chunk, flush=True)
['Microsoft']
['Apple']
['Amazon']
['Facebook']
['IBM']
If you need the entire parsed list at once (after the entire generation process is completed), you can use the .invoke()
method instead of streaming.
output = list_chain.invoke({"company": "Google"})
print(output)
['Microsoft', 'Apple', 'Amazon', 'Facebook', 'IBM']
Asynchronous Parsing
The method described above works for synchronous iteration. However, some applications may require asynchronous operations to prevent blocking the main thread. The following section shows how to achieve the same comma-separated parsing using an async generator.
The asplit_into_list()
works similarly to its synchronous counterpart, aggregating tokens until a comma is encountered. However, it uses the async for
construct to handle asynchronous data streams.
from typing import AsyncIterator
async def asplit_into_list(input: AsyncIterator[str]) -> AsyncIterator[List[str]]:
buffer = ""
async for chunk in input:
buffer += chunk
while "," in buffer:
comma_index = buffer.index(",")
yield [buffer[:comma_index].strip()]
buffer = buffer[comma_index + 1 :]
yield [buffer.strip()]
Then, you can pipe the asynchronous parser into a chain like the synchronous version.
alist_chain = str_chain | asplit_into_list
When you call astream()
, you can process each incoming data chunk as it becomes available within an asynchronous context.
async for chunk in alist_chain.astream({"company": "Google"}):
print(chunk, flush=True)
['Microsoft']
['Apple']
['Amazon']
['Facebook']
['IBM']
Similarly, you can get the entire parsed list, using the asynchronous ainvoke()
method.
result = await alist_chain.ainvoke({"company": "Google"})
print(result)
['Microsoft', 'Apple', 'Amazon', 'Facebook', 'IBM']
Using RunnableGenerator with Our Comma-Separated List Parser
In addition to implementing your own generator functions directly, LangChain offers the RunnableGenerator
class for more advanced or modular streaming behavior. This approach wraps your generator logic in a Runnable, easily pluggin it into a chain while preserving partial output streaming. Below, we modify our comma-separated list parser to demonstrate how RunnableGenerator
can be applied.
Advantages of RunnableGenerator
Modularity: Easily encapsulate your parsing logic as a Runnable component.
Consistency: The
RunnableGenerator
interface (invoke
,stream
,ainvoke
,astream
) is consistent with other LangChain Runnables.Extendability: Combine multiple Runnables (e.g.,
RunnableLambda
,RunnableGenerator
) in sequence for more complex transformations.
Transforming the Same Parser Logic
Previously, we defined split_into_list()
as a standalone Python generator function. Now, let’s create an equivalent transform function, specifically designed for use with RunnableGenerator
. Our goal remains the same: we want to parse a streaming sequence of tokens into a list of individual items upon encountering a comma.
from langchain_core.runnables import RunnableGenerator
from typing import Iterator, List
def comma_parser_runnable(input_iter: Iterator[str]) -> Iterator[List[str]]:
"""
This function accumulates tokens from input_iter and yields
each chunk split by commas as a list.
"""
buffer = ""
for chunk in input_iter:
buffer += chunk
# Whenever we find a comma, split and yield
while "," in buffer:
comma_index = buffer.index(",")
yield [buffer[:comma_index].strip()]
buffer = buffer[comma_index + 1 :]
# Finally, yield whatever remains
yield [buffer.strip()]
# Wrap it in a RunnableGenerator
parser_runnable = RunnableGenerator(comma_parser_runnable)
We can now integrate parser_runnable
into the same prompt-and-model pipeline we used before.
list_chain_via_runnable = str_chain | parser_runnable
When run, partial outputs will appear as single-element lists, like our original custom generator approach.
The difference is that we’re now using RunnableGenerator
to encapsulate the logic in a more modular, LangChain-native way.
# Stream partial results
for parsed_chunk in list_chain_via_runnable.stream({"company": "Google"}):
print(parsed_chunk)
['Microsoft']
['Apple']
['Amazon']
['Facebook']
['IBM']
Last updated