from dotenv import load_dotenv
load_dotenv(override=True)
True
How to Execute Custom Functions
Important Note
A limitation of RunnableLambda is that custom functions can only accept a single argument. You can wrap custom functions with RunnableLambda to use them in your pipeline.
If you have a function that requires multiple parameters, create a wrapper function:
Accepts a single input (typically a dictionary).
Unpacks this input into multiple arguments inside the wrapper.
Passes these arguments to your original function.
For example:
# Won't work with RunnableLambda
def original_function(arg1, arg2, arg3):
pass
# Will work with RunnableLambda
def wrapper_function(input_dict):
return original_function(
input_dict['arg1'],
input_dict['arg2'],
input_dict['arg3']
)
from operator import itemgetter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# Function for returning the length of the text
def length_function(text):
return len(text)
# Function for multiplying the length of two texts
def _multiple_length_function(text1, text2):
return len(text1) * len(text2)
# Wrapper function for connecting the function that receives 2 arguments
def multiple_length_function(
_dict,
): # Function for multiplying the length of two texts
return _multiple_length_function(_dict["text1"], _dict["text2"])
# Create a prompt template
prompt = ChatPromptTemplate.from_template("what is {a} + {b}?")
# Initialize the ChatOpenAI model
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Connect the prompt and model to create a chain
chain1 = prompt | model
# Chain configuration
chain = (
{
"a": itemgetter("input_1") | RunnableLambda(length_function),
"b": {"text1": itemgetter("input_1"), "text2": itemgetter("input_2")}
| RunnableLambda(multiple_length_function),
}
| prompt
| model
| StrOutputParser()
)
Execute the chain and verify the result.
# Execute the chain with the given arguments.
chain.invoke({"input_1": "bar", "input_2": "gah"})
'3 + 9 equals 12.'
Using RunnableConfig as Parameters
RunnableLambda can optionally accept a RunnableConfig object.
This allows you to pass various configuration options to nested executions, including:
Callbacks: For tracking and monitoring function execution.
Tags: For labeling and organizing different runs.
Other configuration: Additional settings to control function behavior.
For example, you can use RunnableConfig to:
Track function performance.
Add logging capabilities.
Group related operations using tags.
Configure error handling and retry logic.
Set timeouts and other execution parameters.
This makes RunnableLambda highly configurable for complex workflows with fine-grained control over execution.
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableConfig
import json
def parse_or_fix(text: str, config: RunnableConfig):
# Create a prompt template for fixing the next text
fixing_chain = (
ChatPromptTemplate.from_template(
"Fix the following text:\n\ntext\n{input}\n\nError: {error}"
" Don't narrate, just respond with the fixed data."
)
| ChatOpenAI(model="gpt-4o-mini", temperature=0)
| StrOutputParser()
)
# Try up to 3 times
for _ in range(3):
try:
# Parse the text as JSON
return json.loads(text)
except Exception as e:
# If parsing fails, call the fixing chain to fix the text
text = fixing_chain.invoke({"input": text, "error": e}, config)
print(f"config: {config}")
# If parsing fails, return "Failed to parse"
return "Failed to parse"
from langchain.callbacks import get_openai_callback
with get_openai_callback() as cb:
# Call the parse_or_fix function using RunnableLambda
output = RunnableLambda(parse_or_fix).invoke(
input="{foo:: bar}",
config={"tags": ["my-tag"], "callbacks": [cb]}, # Pass the config
)
# Print the modified result
print(f"\n\nModified result:\n{output}")