WithListeners
Author: Donghak Lee
Peer Review:
Proofread : Q0211
This is a part of LangChain Open Tutorial
Overview
This tutorial covers the implementation and usage of with_listeners()
in Runnable
.
with_listeners()
binds lifecycle listeners to a Runnable, returning a new Runnable. This allows you to connect event listeners to the data flow, enabling tracking, analysis, and debugging during execution.
The with_listeners()
function provides the ability to add event listeners to a Runnable object. Listeners are functions that are called when specific events occur, such as start, end, or error.
This function is useful in the following scenarios:
Logging the start and end of data processing
Triggering notifications on errors
Printing debugging information
Table of Contents
References
Environment Setup
Set up the environment. You may refer to Environment Setup for more details.
[Note]
langchain-opentutorial
is a package that provides a set of easy-to-use environment setup, useful functions and utilities for tutorials.You can check out the
langchain-opentutorial
for more details.
%%capture --no-stderr
%pip install langchain-opentutorial
# Install required packages
from langchain_opentutorial import package
package.install(
[
"langchain_core",
"langchain_openai",
"datetime",
],
verbose=False,
upgrade=False,
)
# Set environment variables
from langchain_opentutorial import set_env
set_env(
{
"OPENAI_API_KEY": "",
"LANGCHAIN_API_KEY": "",
"LANGCHAIN_TRACING_V2": "true",
"LANGCHAIN_ENDPOINT": "https://api.smith.langchain.com",
"LANGCHAIN_PROJECT": "WithListeners",
}
)
Environment variables have been set successfully.
You can alternatively set API keys such as OPENAI_API_KEY
in a .env
file and load them.
[Note] This is not necessary if you've already set the required API keys in previous steps.
# Load API keys from .env file
from dotenv import load_dotenv
load_dotenv(override=True)
with_listeners
with_listeners()
takes a list of listener functions and returns a new Runnable
object. Listener functions respond to start, end, and error events.
Using event listeners allows you to observe the data flow, and you can flexibly register them using with_listeners()
.
Here is an example implementation of the function.
from langchain_core.runnables import RunnableLambda
import time
# Define tasks for each Runnable
def stepOne(message):
time.sleep(1) # Wait for 1 second
return f"Step 1 completed with message {message}"
def stepTwo(message):
time.sleep(2) # Wait for 2 seconds
return f"Step 2 completed with message {message}"
# Define listener functions
def fnStart(runObj):
print(f"Start: {runObj.inputs}")
def fnEnd(runObj):
print(f"End: {runObj.outputs}")
def fnError(runObj):
print(f"Error: {runObj.error}")
# Define each Runnable
runnable1 = RunnableLambda(stepOne).with_listeners(
on_start=fnStart, on_end=fnEnd, on_error=fnError
)
runnable2 = RunnableLambda(stepTwo).with_listeners(
on_start=fnStart, on_end=fnEnd, on_error=fnError
)
# Chain connection
chain = runnable1 | runnable2
# Execute
chain.invoke("Hello, World!")
Start: {'input': 'Hello, World!'}
End: {'output': 'Step 1 completed with message Hello, World!'}
Start: {'input': 'Step 1 completed with message Hello, World!'}
End: {'output': 'Step 2 completed with message Step 1 completed with message Hello, World!'}
'Step 2 completed with message Step 1 completed with message Hello, World!'
You can also register events in the chain of LCEL using with_listeners()
.
def chainStart(runObj):
print(f"Chain Start: {runObj.inputs}")
def chainEnd(runObj):
print(f"Chain End: {runObj.outputs}")
chain_with_listeners = chain.with_listeners(
on_start=chainStart, on_end=chainEnd, on_error=fnError
)
chain_with_listeners.invoke("Hello, World!")
Chain Start: {'input': 'Hello, World!'}
Start: {'input': 'Hello, World!'}
End: {'output': 'Step 1 completed with message Hello, World!'}
Start: {'input': 'Step 1 completed with message Hello, World!'}
End: {'output': 'Step 2 completed with message Step 1 completed with message Hello, World!'}
Chain End: {'output': 'Step 2 completed with message Step 1 completed with message Hello, World!'}
'Step 2 completed with message Step 1 completed with message Hello, World!'
with_alisteners
Bind asynchronous lifecycle listeners to a Runnable, returning a new Runnable.
on_start: Asynchronously called before the Runnable starts running. on_end: Asynchronously called after the Runnable finishes running. on_error: Asynchronously called if the Runnable throws an error.
The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.
import asyncio
async def testRunnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {time.strftime('%S')}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {time.strftime('%S')}")
async def fnStart(runObj):
print(f"runnable{runObj.inputs['input']}: {time.strftime('%S')}")
await asyncio.sleep(3)
print(f"runnable{runObj.inputs['input']}: {time.strftime('%S')}")
async def fnEnd(runObj):
print(f"runnable{runObj.inputs['input']}: {time.strftime('%S')}")
await asyncio.sleep(2)
print(f"runnable{runObj.inputs['input']}: {time.strftime('%S')}")
runnable = RunnableLambda(testRunnable).with_alisteners(on_start=fnStart, on_end=fnEnd)
async def concurrentRuns():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
await concurrentRuns()
runnable2: 25
runnable3: 25
runnable2: 28
runnable3: 28
Runnable[2s]: starts at 28
Runnable[3s]: starts at 28
Runnable[2s]: ends at 30
runnable2: 30
Runnable[3s]: ends at 31
runnable3: 31
runnable2: 32
runnable3: 33
RootListenersTracer
You can directly bind RootListenersTracer
to a Runnable using RunnableBinding
to register event listeners. This is the internal code of with_listeners()
.
RootListenersTracer
calls listeners on run start, end, and error.
from langchain_core.tracers.root_listeners import RootListenersTracer
from langchain_core.runnables.base import RunnableBinding
from langchain_openai import ChatOpenAI
# Define listener functions
def fnStart(runObj):
print(f"Start: {runObj.inputs}")
def fnEnd(runObj):
print(f"End: {runObj.outputs}")
def fnError(runObj):
print(f"End: {runObj.error}")
# # LLM and chain setup
model = ChatOpenAI(model="gpt-4o-mini", temperature=0.0)
model_with_listeners = RunnableBinding(
bound=model,
config_factories=[
lambda config: {
"callbacks": [
RootListenersTracer(
config=config,
on_start=fnStart,
on_end=fnEnd,
on_error=fnError,
)
],
}
],
)
model_with_listeners.invoke("Tell me the founding year of Google")
Start: {'messages': [[{'lc': 1, 'type': 'constructor', 'id': ['langchain', 'schema', 'messages', 'HumanMessage'], 'kwargs': {'content': 'Tell me the founding year of Google', 'type': 'human'}}]]}
End: {'generations': [[{'text': 'Google was founded in the year 1998.', 'generation_info': {'finish_reason': 'stop', 'logprobs': None}, 'type': 'ChatGeneration', 'message': {'lc': 1, 'type': 'constructor', 'id': ['langchain', 'schema', 'messages', 'AIMessage'], 'kwargs': {'content': 'Google was founded in the year 1998.', 'additional_kwargs': {'refusal': None}, 'response_metadata': {'token_usage': {'completion_tokens': 11, 'prompt_tokens': 14, 'total_tokens': 25, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_72ed7ab54c', 'finish_reason': 'stop', 'logprobs': None}, 'type': 'ai', 'id': 'run-6f335bec-171d-47a8-a508-85bb52307e10-0', 'usage_metadata': {'input_tokens': 14, 'output_tokens': 11, 'total_tokens': 25, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}}, 'tool_calls': [], 'invalid_tool_calls': []}}}]], 'llm_output': {'token_usage': {'completion_tokens': 11, 'prompt_tokens': 14, 'total_tokens': 25, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_72ed7ab54c'}, 'run': None, 'type': 'LLMResult'}
AIMessage(content='Google was founded in the year 1998.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 11, 'prompt_tokens': 14, 'total_tokens': 25, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_72ed7ab54c', 'finish_reason': 'stop', 'logprobs': None}, id='run-6f335bec-171d-47a8-a508-85bb52307e10-0', usage_metadata={'input_tokens': 14, 'output_tokens': 11, 'total_tokens': 25, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})
Using tracers without LCEL
You can directly use on_llm_start()
and on_llm_end()
of RootListenersTracer
to handle events.
from langchain_core.tracers.schemas import Run
import uuid
from datetime import datetime, timezone
# Modify user-defined listener functions
def onStart(run: Run):
print(
f"[START] Run ID: {run.id}, Start time: {run.start_time}\nInput: {run.inputs}"
)
def onEnd(run: Run):
# Safely handle output
print(f"[END] Run ID: {run.id}, End time: {run.end_time}\nOutput: {run.outputs}")
def onError(run: Run):
print(f"[ERROR] Run ID: {run.id}, Error message: {run.error}")
# Create RootListenersTracer
tracer = RootListenersTracer(
config={}, on_start=onStart, on_end=onEnd, on_error=onError
)
# Set up LLM
llm = ChatOpenAI()
# Input text
input_text = "What is the founding year of Google?"
try:
# Create and initialize Run object at the start of execution
run_id = str(uuid.uuid4())
start_time = datetime.now(timezone.utc)
# Create Run object (including only required fields)
run = Run(
id=run_id,
start_time=start_time,
execution_order=1,
serialized={},
inputs={"input": input_text},
run_type="llm",
)
# Call tracer at the start of execution
tracer.on_llm_start(serialized={}, prompts=[input_text], run_id=run_id)
# Execute the actual Runnable
result = llm.generate([input_text])
# Update Run object
run.end_time = datetime.now(timezone.utc)
run.outputs = result
# Call tracer at the end of execution
tracer.on_llm_end(response=result, run_id=run_id)
except Exception as e:
run.error = str(e)
run.end_time = datetime.now(timezone.utc)
tracer.on_llm_error(error=e, run_id=run_id)
print(f"Error occurred: {str(e)}")
[START] Run ID: a76a54b6-8173-4173-b063-ebe107e52dd3, Start time: 2025-01-12 05:32:32.311749+00:00
Input: {'prompts': ['What is the founding year of Google?']}
[END] Run ID: a76a54b6-8173-4173-b063-ebe107e52dd3, End time: 2025-01-12 05:32:32.898851+00:00
Output: {'generations': [[{'text': 'Google was founded on September 4, 1998.', 'generation_info': {'finish_reason': 'stop', 'logprobs': None}, 'type': 'ChatGeneration', 'message': {'lc': 1, 'type': 'constructor', 'id': ['langchain', 'schema', 'messages', 'AIMessage'], 'kwargs': {'content': 'Google was founded on September 4, 1998.', 'additional_kwargs': {'refusal': None}, 'response_metadata': {'token_usage': {'completion_tokens': 13, 'prompt_tokens': 15, 'total_tokens': 28, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, 'type': 'ai', 'id': 'run-d0c3617b-05c1-4e34-8fa5-eba2ed0f2748-0', 'usage_metadata': {'input_tokens': 15, 'output_tokens': 13, 'total_tokens': 28, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}}, 'tool_calls': [], 'invalid_tool_calls': []}}}]], 'llm_output': {'token_usage': {'completion_tokens': 13, 'prompt_tokens': 15, 'total_tokens': 28, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-3.5-turbo'}, 'run': [{'run_id': UUID('d0c3617b-05c1-4e34-8fa5-eba2ed0f2748')}], 'type': 'LLMResult'}
Last updated