This tutorial covers how to use Elasticsearch with LangChain .
This tutorial walks you through using CRUD operations with the Elasticsearchstoring , updating , deleting documents, and performing similarity-based retrieval .
Table of Contents
References
Environment Setup
[Note]
langchain-opentutorial is a package that provides a set of easy-to-use environment setup, useful functions and utilities for tutorials.
Elasticsearch is an open-source, distributed search and analytics engine designed to store, search, and analyze both structured and unstructured data in real-time.
Key Features
Real-Time Search: Instantly searchable data upon ingestion
Large-Scale Data Processing: Efficient handling of vast datasets
Scalability: Flexible scaling through clustering and distributed architecture
Versatile Search Support: Keyword search, semantic search, and multimodal search
Use Cases
Log Analytics: Real-time monitoring of system and application logs
Monitoring: Server and network health tracking
Product Recommendations: Behavior-based recommendation systems
Natural Language Processing (NLP): Semantic text searches
Multimodal Search: Text-to-image and image-to-image searches
Vector Database Functionality in Elasticsearch
Elasticsearch supports vector data storage and similarity search via Dense Vector Fields . As a vector database, it excels in applications like NLP, image search, and recommendation systems.
Core Vector Database Features
Dense Vector Field: Store and query high-dimensional vectors
KNN (k-Nearest Neighbors) Search: Find vectors most similar to the input
Multimodal Search: Combine text and image data for advanced search capabilities
Vector Search Use Cases
Semantic Search: Understand user intent and deliver precise results
Text-to-Image Search: Retrieve relevant images from textual descriptions
Image-to-Image Search: Find visually similar images in a dataset
Elasticsearch goes beyond traditional text search engines, offering robust vector database capabilities essential for NLP and multimodal search applications.
Prepare Data
This section guides you through the data preparation process .
This section includes the following components:
Data Introduction
Preprocess Data
Data Introduction
In this tutorial, we will use the fairy tale π The Little Prince in PDF format as our data.
This material complies with the Apache 2.0 license .
The data is used in a text (.txt) format converted from the original PDF.
You can view the data at the link below.
Preprocess Data
In this tutorial section, we will preprocess the text data from The Little Prince and convert it into a list of LangChain Document objects with metadata.
Each document chunk will include a title field in the metadata, extracted from the first line of each section.
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
import re
from typing import List
def preprocessing_data(content:str)->List[Document]:
# 1. Split the text by double newlines to separate sections
blocks = content.split("\n\n")
# 2. Initialize the text splitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # Maximum number of characters per chunk
chunk_overlap=50, # Overlap between chunks to preserve context
separators=["\n\n", "\n", " "] # Order of priority for splitting
)
documents = []
# 3. Loop through each section
for block in blocks:
lines = block.strip().splitlines()
if not lines:
continue
# Extract title from the first line using square brackets [ ]
first_line = lines[0]
title_match = re.search(r"\[(.*?)\]", first_line)
title = title_match.group(1).strip() if title_match else ""
# Remove the title line from content
body = "\n".join(lines[1:]).strip()
if not body:
continue
# 4. Chunk the section using the text splitter
chunks = text_splitter.split_text(body)
# 5. Create a LangChain Document for each chunk with the same title metadata
for chunk in chunks:
documents.append(Document(page_content=chunk, metadata={"title": title}))
print(f"Generated {len(documents)} chunked documents.")
return documents
# Load the entire text file
with open("./data/the_little_prince.txt", "r", encoding="utf-8") as f:
content = f.read()
# Preprocess Data
docs = preprocessing_data(content=content)
Generated 262 chunked documents.
Setting up Elasticsearch
This part walks you through the initial setup of Elasticsearch .
This section includes the following components:
Load Embedding Model
Load Elasticsearch Client
Load Embedding Model
In this section, you'll learn how to load an embedding model.
This tutorial uses OpenAI'sAPI-Key for loading the model.
π‘ If you prefer to use another embedding model, see the instructions below.
from langchain_openai import OpenAIEmbeddings
embedding = OpenAIEmbeddings(model="text-embedding-3-large")
Load Elasticsearch Client
In this section, we'll show you how to load the database client object using the Python SDK for Elasticsearch .
import os
import logging
from elasticsearch import Elasticsearch, exceptions as es_exceptions
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_db_client(
es_url: str = None,
api_key: str = None,
timeout: int = 120,
retry_on_timeout: bool = True
) -> Elasticsearch:
"""
Initializes and returns an Elasticsearch client instance.
This function loads configuration (e.g., API key, host) from environment
variables or default values and creates a client object to interact
with the Elasticsearch Python SDK.
Args:
es_url (str): Elasticsearch URL. If None, uses 'ES_URL' env var.
api_key (str): API key. If None, uses 'ES_API_KEY' env var.
timeout (int): Request timeout in seconds.
retry_on_timeout (bool): Whether to retry on timeout.
Returns:
Elasticsearch: An instance of the Elasticsearch client.
Raises:
ValueError: If required configuration is missing.
es_exceptions.ConnectionError: If connection fails.
"""
es_url = es_url or os.getenv("ES_URL")
api_key = api_key or os.getenv("ES_API_KEY")
if not es_url or not api_key:
raise ValueError("Elasticsearch URL and API key must be provided.")
client = Elasticsearch(
es_url, api_key=api_key, request_timeout=timeout, retry_on_timeout=retry_on_timeout
)
try:
if client.ping():
logger.info("β Successfully connected to Elasticsearch!")
else:
logger.error("β Failed to connect to Elasticsearch (ping returned False).")
raise es_exceptions.ConnectionError("Failed to connect to Elasticsearch.")
except Exception as e:
logger.error(f"β Elasticsearch connection error: {e}")
raise
return client
client = get_db_client()
INFO:elastic_transport.transport:HEAD https://de7f5f4e2c8a452597e8e4db54c98b30.us-central1.gcp.cloud.es.io:443/ [status:200 duration:0.603s]
INFO:__main__:β Successfully connected to Elasticsearch!
Create Index
If you are successfully connected to Elasticsearch, some basic indexes are already created.
But, in this tutorial we will create a new index with ElasticsearchIndexManager class.
from utils.elasticsearch import ElasticsearchIndexManager
# Create IndexManager Object
index_manger = ElasticsearchIndexManager(client)
# Create A New Index
index_name = "langchain_tutorial_es"
tutorial_index=index_manger.create_index(
embedding, index_name=index_name, metric="cosine"
)
print(tutorial_index)
β Index 'langchain_tutorial_es' created successfully.
{'status': 'created', 'index_name': 'langchain_tutorial_es', 'embedding_dims': 3072, 'metric': 'cosine'}
Document Manager
For the LangChain-OpenTutorial, we have implemented a custom set of CRUD functionalities for VectorDBs
The following operations are included:
upsert : Update existing documents or insert if they donβt exist
upsert_parallel : Perform upserts in parallel for large-scale data
similarity_search : Search for similar documents based on embeddings
delete : Remove documents based on filter conditions
Each of these features is implemented as class methods specific to each VectorDB.
In this tutorial, you'll learn how to use these methods to interact with your VectorDB.
We plan to continuously expand the functionality by adding more common operations in the future.
Create Instance
First, create an instance of the Elasticsearch helper class to use its CRUD functionalities.
This class is initialized with the Elasticsearch Python SDK client instance and the embedding model instance , both of which were defined in the previous section.
# import ElasticsearchDocumentManager
from utils.elasticsearch import ElasticsearchDocumentManager
# connect to tutorial_index
crud_manager = ElasticsearchDocumentManager(
client=client, index_name=index_name, embedding=embedding
)
Now you can use the following CRUD operations with the crud_manager instance.
These instance allow you to easily manage documents in your Elasticsearch .
Upsert Document
Update existing documents or insert if they donβt exist
β Args
texts : Iterable[str] β List of text contents to be inserted/updated.
metadatas : Optional[List[Dict]] β List of metadata dictionaries for each text (optional).
ids : Optional[List[str]] β Custom IDs for the documents. If not provided, IDs will be auto-generated.
**kwargs : Extra arguments for the underlying vector store.
π Return
None
from uuid import uuid4
# Create ID for each document
ids = [str(uuid4()) for _ in docs]
args = {
"texts": [doc.page_content for doc in docs[:2]],
"metadatas": [doc.metadata for doc in docs[:2]],
"ids": ids[:2],
# Add additional parameters if you need
}
crud_manager.upsert(**args)
texts : Iterable[str] β List of text contents to be inserted/updated.
metadatas : Optional[List[Dict]] β List of metadata dictionaries for each text (optional).
ids : Optional[List[str]] β Custom IDs for the documents. If not provided, IDs will be auto-generated.
batch_size : int β Number of documents per batch (default: 32).
workers : int β Number of parallel workers (default: 10).
**kwargs : Extra arguments for the underlying vector store.
π Return
None
from uuid import uuid4
args = {
"texts": [doc.page_content for doc in docs],
"metadatas": [doc.metadata for doc in docs],
"ids": ids,
# Add additional parameters if you need
}
crud_manager.upsert_parallel(**args)
results : List[Document] β A list of LangChain Document objects ranked by similarity.
# Search by Query
results = crud_manager.search(query="What is essential is invisible to the eye.", k=3)
for idx,doc in enumerate(results):
print("="*100)
print(f"Rank {idx+1} | Title : {doc.metadata['title']}")
print(f"Contents : {doc.page_content}")
print(f"Similarity Score : {doc.metadata['score']}")
print()
====================================================================================================
Rank 1 | Title : Chapter 21
Contents : And he went back to meet the fox.
"Goodbye," he said.
"Goodbye," said the fox. "And now here is my secret, a very simple secret: It is only with the heart that one can see rightly; what is essential is invisible to the eye."
"What is essential is invisible to the eye," the little prince repeated, so that he would be sure to remember.
"It is the time you have wasted for your rose that makes your rose so important."
Similarity Score : 0.7546974
====================================================================================================
Rank 2 | Title : Chapter 24
Contents : "Yes," I said to the little prince. "The house, the stars, the desert-- what gives them their beauty is something that is invisible!"
"I am glad," he said, "that you agree with my fox."
Similarity Score : 0.7476631
====================================================================================================
Rank 3 | Title : Chapter 25
Contents : "The men where you live," said the little prince, "raise five thousand roses in the same garden-- and they do not find in it what they are looking for."
"They do not find it," I replied.
"And yet what they are looking for could be found in one single rose, or in a little water."
"Yes, that is true," I said.
And the little prince added:
"But the eyes are blind. One must look with the heart..."
Similarity Score : 0.7111699
# Search with filters
results = crud_manager.search(
query="Which asteroid did the little prince come from?",
k=3,
filters={"title":"Chapter 4"}
)
for idx,doc in enumerate(results):
print("="*100)
print(f"Rank {idx+1} | Title : {doc.metadata['title']}")
print(f"Contents : {doc.page_content}")
print(f"Similarity Score : {doc.metadata['score']}")
print()
====================================================================================================
Rank 1 | Title : Chapter 4
Contents : I have serious reason to believe that the planet from which the little prince came is the asteroid known as B-612. This asteroid has only once been seen through the telescope. That was by a Turkish astronomer, in 1909.
(picture)
On making his discovery, the astronomer had presented it to the International Astronomical Congress, in a great demonstration. But he was in Turkish costume, and so nobody would believe what he said.
Grown-ups are like that...
Similarity Score : 0.8311258
====================================================================================================
Rank 2 | Title : Chapter 4
Contents : - the narrator speculates as to which asteroid from which the little prince cameγγ
I had thus learned a second fact of great importance: this was that the planet the little prince came from was scarcely any larger than a house!
Similarity Score : 0.81760435
====================================================================================================
Rank 3 | Title : Chapter 9
Contents : - the little prince leaves his planet
Similarity Score : 0.8035729
Delete Document
Delete documents based on filter conditions
β Args
ids : Optional[List[str]] β List of document IDs to delete. If None, deletion is based on filter.