Hello, everyone!
In this tutorial, we will build a shopping mall system using Multi Agent!
By the end of this tutorial, you will become the owner of a very impressive shopping mall!
An impressive owner with several solid agents as employees!
Our shopping mall will be structured with one manager who oversees the purchasing and sales systems,
a purchasing specialist dedicated to the purchasing system,
and a sales specialist dedicated to the sales system.
Each role will perform the following tasks:
Manager
Validate Login: Determines whether the user is a legitimate user of our service.
Assign Tasks: Assigns appropriate tasks to specialists based on user requirements.
Purchase Specialist
Recommend Items: Suggests suitable items based on user needs.
Purchase Items: Buys the items confirmed through recommendations.
Cancel Purchase: Cancels the purchase if the item has not been dispatched. If it is in transit, unfortunately, we will need to follow exchange/refund procedures (not covered in this tutorial due to its extensive nature).
Check Item Status: Shows the status of items purchased by the user. There are four possible statuses: Pre-dispatch, In transit, Delivered, and Cancelled.
Sales Specialist
Check Sales History: For sellers, knowing how many items they have sold is crucial. They check their sales history.
Restock Inventory: Popular items often sell out quickly. This function allows for restocking.
Update Item Status: When a buyer places an order, we need to dispatch the item. Once safely delivered to the buyer, the shipment status should be updated. This function is for updating the item status.
Once these three roles are implemented, we will have the following type of service.
Our Fabulous Service!
Then, let's embark on building our shopping mall service!
Setting up your environment is the first step. See the Environment Setup guide for more details.
[Note]
The langchain-opentutorial is a package of easy-to-use environment setup guidance, useful functions and utilities for tutorials.
Check out the langchain-opentutorial for more details.
You can set API keys in a .env file or set them manually.
[Note] If you’re not using the .env file, no worries! Just enter the keys directly in the cell below, and you’re good to go.
Data Preparation
Let's prepare our shopping mall data.
First, we will use the fashion-clothing-products-catalog available on Kaggle.
This dataset contains the following information:
productId
productName
productBrand
Gender
Price
Description
Primary Color
There are two ways to obtain the data.
Download the data directly from the site.
The platform we will be using to download the data is Kaggle, which hosts a variety of datasets for data analysis competitions.
Among these, the dataset we will use can be found at the URL below:Fashion Clothing Products Dataset
There is a detailed description of the dataset on the site, along with various insightful materials. Please refer to them for more information!
Using code. Kaggle provides a api, which allows us to download datasets available on Kaggle very easily.
We will use the second method.
To download the dataset, first log in to Kaggle,
then click on your profile picture in the top right corner and select 'Settings' from the menu that appears.
Kaggle Menu
Below 'Settings,' there is an 'API' section where you need to click Create New Token.
Kaggle Settings
Then, a file named kaggle.json will be created.
Using this json file, you can authenticate the API and utilize the Kaggle API.
Once you move the file to the .kaggle folder, you will be ready to use the API. The path is as follows.
The data is ready, so we need a database to store it.
We will use PostgreSQL.
Refer to the code below to set up a PostgreSQL database using a Docker container.
However, instead of just PostgreSQL, we plan to use an image of pgvector,
which allows for vector data types in PostgreSQL.
Now that the database is up and running, let's insert our prepared data into the DB!
The database preparation is now complete!
So, how should we insert the data into the database for our service?
In our tutorial, we plan to implement only very simple features, so we have also designed the ERD in a simple way, as shown below.
Our shopping mall service ERD
Noteworthy in the data configuration is description_ebd of inventory.
This is the reason why we use pgvector. This data is used when recommending items.
When a user inputs what kind of item they want, it compares the features of the item described by the user
with description_ebd using COSINE distance and recommends the item with the highest similarity.
Creating fictional characters one by one could be one way, but...
Since we have a reliable tool called GPT, let's actively utilize it!
Let's create 10 dummy data entries each for buyers and sellers.
This concludes the preparation of buyer and seller data!
Next, let's load information about the items that the seller will sell into a table called inventory!
Here, we are going to do something a bit interesting...
We will be giving character to our dummy data!
Once character is given, we expect to see unique and lively product descriptions!
Doesn't it sound fun?
Let's create it together!
Shall we check the personalities of our buyers?
Quite unique and colorful sellers have been created!
I look forward to seeing how these unique sellers create product descriptions.
Now all the data preparation is complete! Let's officially start building the service!
Define State
Define Tools
Let's define a tool for administrators.
The tasks that administrators can perform are straightforward:
Verify whether the login is valid or not.
Once the validity is determined, they can call an expert.
The obtained user information will be stored in the config and can be utilized throughout the service usage.
Once the user is verified as valid, the administrator delegates the task to the expert.
Let's define tools for sales experts.
As mentioned in the overview, sales experts can perform the following four tasks:
Recommend products
Purchase products
Cancel purchases
Check product status
Next, let's define the tools of a purchasing expert.
A purchasing expert can perform the following three tasks:
Checking sales records
Replenishing stock
Updating product conditions
The Sales Expert and Purchase Expert will define a common tool called CompleteOrEscalate.
This tool will check whether the current user has completed their task,
wants to cancel their task,
or has changed their mind about the current task.
If the task is finished, it will proceed to return to the administrator.
Define Nodes
Since all the tools have been defined, we need to define the agents that will use these tools!
Administrator Agent
Purchase Expert Agent
Sales Expert Agent
We will define these three types of agents.
Let's define the nodes necessary for these agents.
Build the Graph
png
Let's run our service
With this, our shopping mall system is now complete.
Although it is, in fact, almost incomplete.
And I realized while making this tutorial that the example I created was not very suitable for applying the concept of multi-agent.
Still, I believe you were able to grasp the concept of how multi-agents are structured
and how they operate by following this tutorial.
By using the concepts you have learned, I believe you will be able to create more suitable and impressive services!
Thank you for your hard work this time as well! Thank you.
from dotenv import load_dotenv
from langchain_opentutorial import set_env
# Attempt to load environment variables from a .env file; if unsuccessful, set them manually.
if not load_dotenv():
set_env(
{
"OPENAI_API_KEY": "",
"LANGCHAIN_API_KEY": "",
"LANGCHAIN_TRACING_V2": "true",
"LANGCHAIN_ENDPOINT": "https://api.smith.langchain.com",
"LANGCHAIN_PROJECT": "MultiAgentShoppingMallSystem", # set the project name same as the title
}
)
from kaggle.api.kaggle_api_extended import KaggleApi
from glob import glob
import pandas as pd
import numpy as np
api = KaggleApi()
api.authenticate()
api.dataset_download_files('shivamb/fashion-clothing-products-catalog', unzip=True)
Warning: Your Kaggle API key is readable by other users on this system! To fix this, you can run 'chmod 600 /Users/kimheeah/.kaggle/kaggle.json'
Warning: Your Kaggle API key is readable by other users on this system! To fix this, you can run 'chmod 600 /Users/kimheeah/.kaggle/kaggle.json'
Dataset URL: https://www.kaggle.com/datasets/shivamb/fashion-clothing-products-catalog
data = glob('./*.csv')
data
['./myntra_products_catalog.csv']
df = pd.read_csv(data[0])
# Since the `np.int64` type causes errors when loading data into PostgreSQL, convert it to `str`
# (the database will handle type conversion automatically during insertion).
df = df.map(lambda x: str(x))
df.head()
df.loc[0,'ProductID']
'10017413'
# Let's use exactly 100 pieces of data for convenience, as the dataset is quite large.
df = df.iloc[:100]
### Since our service will not display images, let's remove unnecessary columns.
df = df.drop(columns=['NumImages'])
### Insert Data
from psycopg2.extras import execute_values
insert_list = [tuple(df.iloc[i]) for i in range(len(df))]
query = f"INSERT INTO Product VALUES %s;"
try:
execute_values(cursor, query, insert_list)
conn.commit()
print("Data insertion completed.")
except Exception as e:
conn.rollback()
print(f"An error occurred: {e}")
Data insert is complete.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import os
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.7,
api_key=os.environ["OPENAI_API_KEY"],
)
prompt = ChatPromptTemplate.from_template(
"""
You are an expert in generating PostgreSQL queries. Please review the given table information and column details,
and write appropriate queries based on the user's questions.
Do not wrap in MD code blocks, do not insert newline characters with \n, just return the SQL query statement only.
{db_info}
{user_input}
"""
)
chain = prompt | llm
insert_user_queries = chain.invoke({
"db_info":"""
# Table information
- ServiceUser: A table that contains user information.
# Column information
- id (varchar): The user's site ID.
- name (varchar): The user's name.
- gender (varchar): The user's gender.
- type (varchar): The user's type. Please just insert TYPE.
"""
,"user_input":"""
# User's Question
Please return 20 individual insert queries generating dummy data.
Please separate each query with \n.
"""}).content
insert_user_queries
count = 0
try:
for insert_user_query in insert_user_queries:
if count<10:
count += 1
insert_user_query = insert_user_query.replace('TYPE', 'customer')
else:
insert_user_query = insert_user_query.replace('TYPE', 'vendor')
cursor.execute(insert_user_query)
conn.commit()
print("ServiceUser data insertion completed.")
except Exception as e:
conn.rollback()
print(f"An error occurred: {e}")
ServiceUser data insert is complete.
query = "SELECT * FROM ServiceUser Where type='vendor';"
try:
cursor.execute(query)
conn.commit()
vendors= cursor.fetchall()
print("Vendor Data selection completed.")
except Exception as e:
conn.rollback()
print(f"An error occurred: {e}")
print("vendor data : ",vendors[0])
Vendor Data Selection is complete.
vendor data : ('user11', 'pass11', 'Ivy Carter', 'Female', 'vendor')
personalities = {}
for vendor in vendors:
personalities[vendor[2]] = llm.invoke(f"""
The seller's name is {vendor[2]} and the gender is {vendor[3]}, please assign a random personality in one line.
Please ensure that the given personality is different from the other sellers provided below.
# Personalities generated for sellers so far
{personalities}
""").content
personalities
{'Ivy Carter': 'Ivy Carter is an adventurous and free-spirited individual who thrives on spontaneity and exploration.',
'Jack Thompson': 'Jack Thompson is a meticulous and detail-oriented planner who values structure and organization in all aspects of life.',
'Kathy Martinez': 'Kathy Martinez is an empathetic and nurturing soul who finds joy in connecting with others and fostering supportive relationships.',
'Leo Robinson': 'Leo Robinson is a charismatic and outgoing communicator who loves engaging with people and bringing a sense of enthusiasm to every interaction.',
'Mia White': 'Mia White is a creative and imaginative thinker who enjoys expressing herself through art and innovative ideas, often inspiring those around her.',
'Noah Lewis': 'Noah Lewis is a pragmatic and resourceful problem-solver who approaches challenges with a calm demeanor and a focus on practical solutions.',
'Olivia Hall': 'Olivia Hall is a whimsical and playful dreamer who delights in storytelling and finding magic in the everyday moments of life.',
'Paul Young': 'Paul Young is a driven and ambitious strategist who is always looking for opportunities to innovate and achieve his goals with determination.',
'Quinn Allen': 'Quinn Allen is a thoughtful and introspective individual who enjoys deep conversations and finding meaning in every experience, often reflecting on the world around her.',
'Ryan King': 'Ryan King is a witty and humorous individual who uses laughter as a way to connect with others and lighten any situation.'}
from langchain_openai import OpenAIEmbeddings
# We use the text-embedding-3-small model as the embedding model.
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
)
# Execution time approximately 5 to 10 minutes
import re
import json
# Errors occur because apostrophes and single-quoted sentences in the description are not distinguished, so separate handling is required.
def escape_single_quotes(sql_query):
"""
Replace single quotes that meet specific conditions within an SQL query with two single quotes.
Condition: Only replace single quotes that are not preceded or followed by (, ), , or space.
"""
return re.sub(r"(?<![()\',\s])'(?=[^()\',\s]|$)", r"''", sql_query)
ex_print = True
for vendor in vendors:
count=np.random.randint(5, 10)
query = f"SELECT * FROM product ORDER BY RANDOM() LIMIT {count};"
try:
cursor.execute(query)
products = cursor.fetchall()
conn.commit()
print("Product data selection completed.")
except Exception as e:
conn.rollback()
print(f"An error occurred: {e}")
for i in range(len(products)):
try:
insert_inventory_dict = chain.invoke({
"db_info":"""
# Table and Column Information
### Table Information 1
- ServiceUser: A table that contains information about sellers.
### Column Information 1
- id (varchar): The site ID of the seller.
- name (varchar): The name of the seller.
- gender (varchar): The gender of the seller.
- type (varchar): A column indicating the seller.
### Table Information 2
- Product: A table that contains information about products.
### Column Information 2
- id (int8): The product ID.
- name (varchar): The product name.
- brand (varchar): The product brand.
- gender (varchar): The product gender classification.
- price (integer): The cost price of the product.
- description (text): The product description.
- primary_color (varchar): The color of the product.
### Table Information 3
- Inventory: A table that contains information about products.
### Column Information 3
- user_id (varchar): The ID of the seller supplying the product. Refers to the id in the ServiceUser table.
- product_id (int8): The product ID. Refers to the id in the Product table.
- remains (integer): The remaining quantity.
- price (integer): The sale price of the product.
- description (text): The product description.
- description_ebd (vector) : Product description embeddings. Please just insert EBD Without single quote.
"""
,"user_input":f"""
# User Question
Using the given seller DB information and the nature of the product DB information,
generate an SQL query to insert data into the Inventory table,
which contains information about products to be registered by the seller to the service.
Seller DB information
: {"ServiceUser Values ",vendor}
Seller personality
: {personalities[vendor[2]]}
Product DB information
: {"Product Values ",products[i]}
At this time, please insert a random value between 1 and 10 for the remains.
Set the price higher than the cost price based on the registered information,
determining the margin according to the seller's nature.
Modify and add the description based on the registered information,
excluding any seller-specific information but reflecting the seller's character and personality.
Please follow the output format for the response.
"""+
"""
# Output Format
{"query" : "{Generated query}", "description" : "{Generated description sentence}"}
"""}).content
insert_inventory_dict = json.loads(insert_inventory_dict)
insert_inventory_query = escape_single_quotes(insert_inventory_dict['query'])
description_ebd = embeddings.embed_query(insert_inventory_dict["description"])
insert_inventory_query = insert_inventory_query.replace("EBD", "'"+str(description_ebd)+"'")
cursor.execute(insert_inventory_query)
conn.commit()
print(f"{i+1}th data insertion completed out of {len(products)}th data")
except Exception as e:
conn.rollback()
print(f"An error occurred: {e}")
from pydantic import BaseModel, Field
from typing import Annotated, Literal, Optional, Callable
from typing_extensions import TypedDict
from langchain_core.messages import ToolMessage
from langchain_core.runnables import Runnable, RunnableConfig, RunnableLambda
from langchain_core.tools import tool
from langchain_core.runnables.config import RunnableConfig
from langgraph.graph.message import AnyMessage, add_messages
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import tools_condition
# When the administrator assigns tasks and it switches to the purchasing expert/sales expert state,
# it stores how many conversations have taken place during that state.
# After the expert has completed all possible actions and it returns to the administrator state,
# it helps to erase all conversation history during that state so that the administrator can refocus on their role. (Refer to pop_dialog_state)
def update_dialog_stack(left: list[str], right: Optional[str]) -> list[str]:
"""Push or pop the state."""
if right is None:
return left
if right == "pop":
return left[:-1]
return left + [right]
class User(BaseModel):
id: str
name: str
gender: str
type: str
class State(TypedDict):
user_info: Annotated[User, "User information"]
messages: Annotated[list[AnyMessage], add_messages]
dialog_state: Annotated[
list[
Literal[
"customer",
"vendor",
]
],
update_dialog_stack,
]
@tool
def check_validation(
config: RunnableConfig):
"""
Verify the validity by checking if the matching ID exist in the user information table.
"""
query = f"SELECT * FROM serviceuser WHERE id='{config['configurable']['user_id']}';"
user = None
try:
cursor.execute(query)
row = cursor.fetchone()
colnames = [desc[0] for desc in cursor.description]
user_info = dict(zip(colnames, row))
user = User(**user_info)
conn.commit()
except Exception as e:
conn.rollback()
return user
class ToSalesSpecialistAssistant(BaseModel):
"""Transfers work to a specialized assistant to handle sales service."""
request: str = Field(
description="Any necessary followup questions the update sales specailist should clarify before proceeding."
)
class ToPurchaseSpecialistAssistant(BaseModel):
"""Transfers work to a specialized assistant to handle purchase service."""
request: str = Field(
description="Any necessary followup questions the update purchase specailist should clarify before proceeding."
)
@tool
def get_recommendation(query: Annotated[str, "User's question"],
color:Annotated[str, "The color of the product the user wants. If it does not exist, please fill it with None."],
count:Annotated[int,"Number of recommended products. If it does not exist, please fill it with None."]):
"""
Recommend products to the user.
"""
query = f"""SELECT p.name, p.brand, p.gender, i.price, i.description
FROM inventory i
JOIN product p on p.id = i.product_id
WHERE 1=1"""
if color != 'None' and color is not None:
query += f" AND REPLACE(lower(p.primary_color), ' ', '') = REPLACE(lower('{color}'), ' ', '')"
# <=> : Get COSINE distance
query += " ORDER BY i.description_ebd <=> '"+str(embeddings.embed_query(query))+"'"
if count != 'None' and count is not None:
query += f" LIMIT {count};"
else:
query += f" LIMIT 5;"
print(query)
try:
cursor.execute(query)
recommendations = cursor.fetchall()
conn.commit()
return recommendations
except Exception as e:
conn.rollback()
return "The recommendation process encountered an error and has been canceled."
@tool
def purchase_product(inventory_id:Annotated[str, "Inventory product's id"],
config:RunnableConfig):
"""
Purchase products.
"""
my_id = config['configurable']['user_id']
query = f"INSERT INTO purchase({my_id}, {inventory_id}, 'Pre-dispatch')"
try:
cursor.execute(query)
conn.commit()
return "The purchase has been completed."
except Exception as e:
conn.rollback()
return "The purchase was cancelled due to a problem."
@tool
def cancel_purchase(inventory_id:Annotated[str, "Inventory product's id"],
config:RunnableConfig):
"""
Canceling the purchase of the product.
"""
my_id = config['configurable']['user_id']
query = f"UPDATE purchase SET status='cancelled' WHERE customer_id = '{my_id}' and inventory_id = '{inventory_id}'"
try:
cursor.execute(query)
conn.commit()
return "The purchase cancellation has been completed."
except Exception as e:
conn.rollback()
return "There was an issue with the cancellation, so it was canceled."
@tool
def check_item_status(config:RunnableConfig):
"""
Checking the condition of the purchased product.
"""
my_id = config['configurable']['user_id']
query = f"SELECT * FROM purchase WHERE customer_id = '{my_id}'"
try:
cursor.execute(query)
purchase_history = cursor.fetchall()
conn.commit()
return purchase_history
except Exception as e:
conn.rollback()
return "There was an issue with the data retrieval."
@tool
def check_sales_record(config:RunnableConfig):
"""
Checking sales records.
"""
my_id = config['configurable']['user_id']
query = f"SELECT * FROM purchase p JOIN inventory i ON i.id = p.inventory_id WHERE i.vendor_id = '{my_id}'"
try:
cursor.execute(query)
purchase_history = cursor.fetchall()
conn.commit()
return purchase_history
except Exception as e:
conn.rollback()
return "There was an issue with the data retrieval."
@tool
def restock(product_id:Annotated[str, "Product's id"],
count:Annotated[int, "Number of stocks to replenish."],
config:RunnableConfig):
"""
Replenishing product stock.
"""
my_id = config['configurable']['user_id']
query = f"UPDATE inventory SET remains = remains + {count} WHERE user_id = '{my_id}' AND product_id = '{product_id}'"
try:
cursor.execute(query)
conn.commit()
return "Stock replenishment is complete."
except Exception as e:
conn.rollback()
return "There was an issue during stock replenishment, and it has been canceled."
@tool
def update_item_status(inventory_id:Annotated[str, "Inventory product's id"],
status:Annotated[Literal['In transit', 'Delivered', 'Cancelled'], "The product status can be 'In transit', 'Delivered', or 'Cancelled'."]):
"""
Updating the status of the product.
The status can be updated to one of the following three:
- In transit: The product has been shipped from the warehouse.
- Delivered: The product has been delivered to the customer.
- Cancelled: The order has been cancelled by the customer.
"""
query = f"UPDATE purchase SET statue='{status}' FROM inventory WHERE purchase.inventory_id = inventory.id AND purchase.inventory_id = '{inventory_id}'"
try:
cursor.execute(query)
conn.commit()
return "The stock replenishment is complete."
except Exception as e:
conn.rollback()
return "The stock replenishment was canceled due to a problem."
return
class CompleteOrEscalate(BaseModel):
"""A tool to mark the current task as completed and/or to escalate control of the dialog to the main assistant,
who can re-route the dialog based on the user's needs."""
cancel: bool = True
reason: str
class Config:
json_schema_extra = {
"example": {
"cancel": True,
"reason": "User changed their mind about the current task.",
},
"example 2": {
"cancel": True,
"reason": "I have fully completed the task.",
},
"example 3": {
"cancel": False,
"reason": "I need to search the items for more information.",
},
}
primary_assistant_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are an excellent customer support representative for a clothing shopping mall service."
"Your main role is to perform login verification to ensure that the user is authorized to use the service. \
Once verified, you delegate tasks to the appropriate expert based on the user's requested service."
"There are two experts you can delegate tasks to:"
"1. purchase_specialist : An expert who provides appropriate services to consumers wishing to purchase items."
"2. sales_specialist: An expert who offers appropriate services to sellers wishing to sell items."
"Delegate the next action to the experts based on the user's request. If the process is complete, respond with END."
),
("placeholder", "{messages}"),
]
)
primary_assistant_tools = [
check_validation
]
assistant_runnable = primary_assistant_prompt | llm.bind_tools(
primary_assistant_tools
+ [
ToSalesSpecialistAssistant,
ToPurchaseSpecialistAssistant,
]
)
purchase_specialist_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are an expert specialized in customer services."
"The primary assistant delegates tasks to you when the user wants an item recommendation, a purchase, \
a purchase cancellation, or to check purchase history."
"If the user needs help, but you do not have the appropriate tool, 'CompleteOrEscalate' to the host assistant."
"Do not waste the user's time, and do not create incorrect tools or functions."
),
("placeholder", "{messages}"),
]
)
purchase_specialist_tools = [get_recommendation, purchase_product, cancel_purchase, check_item_status]
purchase_specialist_runnable = purchase_specialist_prompt | llm.bind_tools(
purchase_specialist_tools + [CompleteOrEscalate]
)
sales_specialist_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are an expert specialized in vendor services."
"The primary assistant delegates tasks to you when the user wants to check item sales history, \
restock inventory, or update item status."
"If the user needs help, but you do not have the appropriate tool, 'CompleteOrEscalate' to the host assistant."
"Do not waste the user's time, and do not create incorrect tools or functions."
),
("placeholder", "{messages}"),
]
)
sales_specialist_tools = [check_sales_record, restock, update_item_status]
sales_specialist_runnable = sales_specialist_prompt | llm.bind_tools(
sales_specialist_tools + [CompleteOrEscalate]
)
class Assistant:
def __init__(self, runnable: Runnable):
self.runnable = runnable
def __call__(self, state: State):
while True:
result = self.runnable.invoke(state)
if not result.tool_calls and (
not result.content
or isinstance(result.content, list)
and not result.content[0].get("text")
):
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
else:
break
return {"messages": result}
# Validate the user's information. After validation, retrieve the user's information.
def get_user_info_node(state: State):
return {"user_info": check_validation.invoke({})}
# Notifying that you have been delegated a task from the administrator and your current status has become purchase_specialist or sales_specialist.
def create_entry_node(assistant_name: str, new_dialog_state: str) -> Callable:
def entry_node(state: State) -> dict:
tool_call_id = state["messages"][-1].tool_calls[0]["id"]
return {
"messages": [
ToolMessage(
content=f"The assistant is now the {assistant_name}. Reflect on the above conversation between the host assistant and the user."
f" The user's intent is unsatisfied. Use the provided tools to assist the user. Remember, you are {assistant_name},"
" and the booking, update, other other action is not complete until after you have successfully invoked the appropriate tool."
" If the user changes their mind or needs help for other tasks, call the CompleteOrEscalate function to let the primary host assistant take control."
" Do not mention who you are - just act as the proxy for the assistant.",
tool_call_id=tool_call_id,
)
],
"dialog_state": new_dialog_state,
}
return entry_node
# When the expert returns to the administrator status after completing all tasks, the conversation history during the expert status is popped.
def pop_dialog_state(state: State) -> dict:
"""Pop the dialog stack and return to the main assistant.
This lets the full graph explicitly track the dialog flow and delegate control
to specific sub-graphs.
"""
messages = []
if state["messages"][-1].tool_calls:
messages.append(
ToolMessage(
content="Resuming dialog with the host assistant. Please reflect on the past conversation and assist the user as needed.",
tool_call_id=state["messages"][-1].tool_calls[0]["id"],
)
)
return {
"dialog_state": "pop",
"messages": messages,
}
# Graph branching settings for sales_specialist.
def route_sales_specialist(
state: State,
):
route = tools_condition(state)
if route == END:
return END
tool_calls = state["messages"][-1].tool_calls
did_cancel = any(tc["name"] == CompleteOrEscalate.__name__ for tc in tool_calls)
if did_cancel:
return "leave_skill"
return "sales_specialist_tools"
# Graph branching settings for purchase_specialist.
def route_purchase_specialist(
state: State,
):
route = tools_condition(state)
if route == END:
return END
tool_calls = state["messages"][-1].tool_calls
did_cancel = any(tc["name"] == CompleteOrEscalate.__name__ for tc in tool_calls)
if did_cancel:
return "leave_skill"
return "purchase_specialist_tools"
# Graph branching settings for primary_assistant.
def route_primary_assistant(
state: State
):
route = tools_condition(state)
if route == END:
return END
tool_calls = state["messages"][-1].tool_calls
if tool_calls:
if tool_calls[0]["name"] == ToPurchaseSpecialistAssistant.__name__:
print(state)
if state['user_info'] is None:
raise ValueError("Unauthorized user")
return "enter_purchase_specialist"
elif tool_calls[0]["name"] == ToSalesSpecialistAssistant.__name__:
if state['user_info'] is None:
raise ValueError("Unauthorized user")
return "enter_sales_specialist"
return "primary_assistant_tools"
raise ValueError("Invalid route")
# If an error occurs after calling the tool, it returns the cause of the error.
def handle_tool_error(state) -> dict:
error = state.get("error")
tool_calls = state["messages"][-1].tool_calls
return {
"messages": [
ToolMessage(
content=f"Error: {repr(error)}\n please fix your mistakes.",
tool_call_id=tc["id"],
)
for tc in tool_calls
]
}
def create_tool_node_with_fallback(tools: list) -> dict:
return ToolNode(tools).with_fallbacks(
[RunnableLambda(handle_tool_error)], exception_key="error"
)