# Set environment variables
from dotenv import load_dotenv
from langchain_opentutorial import set_env
# Attempt to load environment variables from a .env file; if unsuccessful, set them manually.
if not load_dotenv():
set_env(
{
"OPENAI_API_KEY": "",
"LANGCHAIN_API_KEY": "",
"LANGCHAIN_TRACING_V2": "true",
"LANGCHAIN_ENDPOINT": "https://api.smith.langchain.com",
"LANGCHAIN_PROJECT": "Elasticsearch",
"HUGGINGFACEHUB_API_TOKEN": "",
"ES_URL": "",
"ES_API_KEY": "",
}
)
# Automatically select the appropriate device
import torch
import platform
def get_device():
if platform.system() == "Darwin": # macOS specific
if hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
print("β Using MPS (Metal Performance Shaders) on macOS")
return "mps"
if torch.cuda.is_available():
print("β Using CUDA (NVIDIA GPU)")
return "cuda"
else:
print("β Using CPU")
return "cpu"
# Set the device
device = get_device()
print("π₯οΈ Current device in use:", device)
β Using MPS (Metal Performance Shaders) on macOS
π₯οΈ Current device in use: mps
# Embedding Model Local Storage Path
import os
import warnings
# Ignore warnings
warnings.filterwarnings("ignore")
# Set the download path to ./cache/
os.environ["HF_HOME"] = "./cache/"
Introduction to Elasticsearch
Elasticsearch is an open-source, distributed search and analytics engine designed to store, search, and analyze both structured and unstructured data in real-time.
π Key Features
Real-Time Search: Instantly searchable data upon ingestion
Large-Scale Data Processing: Efficient handling of vast datasets
Scalability: Flexible scaling through clustering and distributed architecture
Versatile Search Support: Keyword search, semantic search, and multimodal search
π Use Cases
Log Analytics: Real-time monitoring of system and application logs
Monitoring: Server and network health tracking
Product Recommendations: Behavior-based recommendation systems
Natural Language Processing (NLP): Semantic text searches
Multimodal Search: Text-to-image and image-to-image searches
π§ Vector Database Functionality in Elasticsearch
Elasticsearch supports vector data storage and similarity search via Dense Vector Fields. As a vector database, it excels in applications like NLP, image search, and recommendation systems.
π Core Vector Database Features
Dense Vector Field: Store and query high-dimensional vectors
KNN (k-Nearest Neighbors) Search: Find vectors most similar to the input
Elasticsearch goes beyond traditional text search engines, offering robust vector database capabilities essential for NLP and multimodal search applications. π
ElasticsearchManager
Purpose: Simplifies interactions with Elasticsearch, allowing easy management of indices and documents through user-friendly methods.
Core Features
Index management: create, delete, and manage indices.
Document operations: upsert, retrieve, search, and delete documents.
Bulk and parallel operations: perform upserts in bulk or in parallel for high performance.
Methods and Parameters
__init__
Role: Initializes the ElasticsearchManager instance and connects to the Elasticsearch cluster.
Parameters
es_url (str): The URL of the Elasticsearch host (default: "http://localhost:9200").
api_key (Optional[str]): The API key for authentication (default: None).
Behavior
Establishes a connection to Elasticsearch.
Tests the connection using ping() and raises a ConnectionError if it fails.
This class provides a robust and user-friendly interface to manage Elasticsearch operations.
It encapsulates common tasks like creating indices, searching for documents, and performing upserts, making it ideal for use in data management pipelines or applications.
from typing import Optional, Dict, List, Generator
from elasticsearch import Elasticsearch, helpers
from concurrent.futures import ThreadPoolExecutor
class ElasticsearchManager:
def __init__(
self, es_url: str = "http://localhost:9200", api_key: Optional[str] = None
) -> None:
"""
Initialize the ElasticsearchManager with a connection to the Elasticsearch instance.
Parameters:
es_url (str): URL of the Elasticsearch host.
api_key (Optional[str]): API key for authentication (optional).
"""
# Initialize the Elasticsearch client
if api_key:
self.es = Elasticsearch(es_url, api_key=api_key, timeout=120, retry_on_timeout=True)
else:
self.es = Elasticsearch(es_url, timeout=120, retry_on_timeout=True)
# Test connection
if self.es.ping():
print("β Successfully connected to Elasticsearch!")
else:
raise ConnectionError("β Failed to connect to Elasticsearch.")
def create_index(
self,
index_name: str,
mapping: Optional[Dict] = None,
settings: Optional[Dict] = None,
) -> str:
"""
Create an Elasticsearch index with optional mapping and settings.
Parameters:
index_name (str): Name of the index to create.
mapping (Optional[Dict]): Mapping definition for the index.
settings (Optional[Dict]): Settings definition for the index.
Returns:
str: Success or warning message.
"""
try:
if not self.es.indices.exists(index=index_name):
body = {}
if mapping:
body["mappings"] = mapping
if settings:
body["settings"] = settings
self.es.indices.create(index=index_name, body=body)
return f"β Index '{index_name}' created successfully."
else:
return f"β οΈ Index '{index_name}' already exists. Skipping creation."
except Exception as e:
return f"β Error creating index '{index_name}': {e}"
def delete_index(self, index_name: str) -> str:
"""
Delete an Elasticsearch index if it exists.
Parameters:
index_name (str): Name of the index to delete.
Returns:
str: Success or warning message.
"""
try:
if self.es.indices.exists(index=index_name):
self.es.indices.delete(index=index_name)
return f"β Index '{index_name}' deleted successfully."
else:
return f"β οΈ Index '{index_name}' does not exist."
except Exception as e:
return f"β Error deleting index '{index_name}': {e}"
def get_document(self, index_name: str, document_id: str) -> Optional[Dict]:
"""
Retrieve a single document by its ID.
Parameters:
index_name (str): The index to retrieve the document from.
document_id (str): The ID of the document to retrieve.
Returns:
Optional[Dict]: The document's content if found, None otherwise.
"""
try:
response = self.es.get(index=index_name, id=document_id)
return response["_source"]
except Exception as e:
print(f"β Error retrieving document: {e}")
return None
def search_documents(self, index_name: str, query: Dict) -> List[Dict]:
"""
Search for documents based on a query.
Parameters:
index_name (str): The index to search.
query (Dict): The query body for the search.
Returns:
List[Dict]: List of documents that match the query.
"""
try:
response = self.es.search(index=index_name, body={"query": query})
return [hit["_source"] for hit in response["hits"]["hits"]]
except Exception as e:
print(f"β Error searching documents: {e}")
return []
def upsert_document(
self, index_name: str, document_id: str, document: Dict
) -> Dict:
"""
Perform an upsert operation on a single document.
Parameters:
index_name (str): The index to perform the upsert on.
document_id (str): The ID of the document.
document (Dict): The document content to upsert.
Returns:
Dict: The response from Elasticsearch.
"""
try:
response = self.es.update(
index=index_name,
id=document_id,
body={"doc": document, "doc_as_upsert": True},
)
return response
except Exception as e:
print(f"β Error upserting document: {e}")
return {}
def bulk_upsert(
self, index_name: str, documents: List[Dict], timeout: Optional[str] = None
) -> None:
"""
Perform a bulk upsert operation.
Parameters:
index (str): Default index name for the documents.
documents (List[Dict]): List of documents for bulk upsert.
timeout (Optional[str]): Timeout duration (e.g., '60s', '2m'). If None, the default timeout is used.
"""
try:
# Ensure each document includes an `_index` field
for doc in documents:
if "_index" not in doc:
doc["_index"] = index_name
# Perform the bulk operation
helpers.bulk(self.es, documents, timeout=timeout)
print("β Bulk upsert completed successfully.")
except Exception as e:
print(f"β Error in bulk upsert: {e}")
def parallel_bulk_upsert(
self,
index_name: str,
documents: List[Dict],
batch_size: int = 100,
max_workers: int = 4,
timeout: Optional[str] = None,
) -> None:
"""
Perform a parallel bulk upsert operation.
Parameters:
index_name (str): Default index name for documents.
documents (List[Dict]): List of documents for bulk upsert.
batch_size (int): Number of documents per batch.
max_workers (int): Number of parallel threads.
timeout (Optional[str]): Timeout duration (e.g., '60s', '2m'). If None, the default timeout is used.
"""
def chunk_data(
data: List[Dict], chunk_size: int
) -> Generator[List[Dict], None, None]:
"""Split data into chunks."""
for i in range(0, len(data), chunk_size):
yield data[i : i + chunk_size]
# Ensure each document has an `_index` field
for doc in documents:
if "_index" not in doc:
doc["_index"] = index_name
batches = list(chunk_data(documents, batch_size))
def bulk_upsert_batch(batch: List[Dict]):
helpers.bulk(self.es, batch, timeout=timeout)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for batch in batches:
executor.submit(bulk_upsert_batch, batch)
def delete_document(self, index_name: str, document_id: str) -> Dict:
"""
Delete a single document by its ID.
Parameters:
index_name (str): The index to delete the document from.
document_id (str): The ID of the document to delete.
Returns:
Dict: The response from Elasticsearch.
"""
try:
response = self.es.delete(index=index_name, id=document_id)
return response
except Exception as e:
print(f"β Error deleting document: {e}")
return {}
def delete_by_query(self, index_name: str, query: Dict) -> Dict:
"""
Delete documents based on a query.
Parameters:
index_name (str): The index to delete documents from.
query (Dict): The query body for the delete operation.
Returns:
Dict: The response from Elasticsearch.
"""
try:
response = self.es.delete_by_query(
index=index_name, body={"query": query}, conflicts="proceed"
)
return response
except Exception as e:
print(f"β Error deleting documents by query: {e}")
return {}
Data Preparation for Tutorial
Letβs process The Little Prince using the RecursiveCharacterTextSplitter to create document chunks.
Then, weβll generate embeddings for each text chunk and store the resulting data in a vector database to proceed with a vector database tutorial.
from langchain_text_splitters import RecursiveCharacterTextSplitter
# Function to read text from a file (Cross-Platform)
def read_text_file(file_path):
try:
with open(file_path, encoding="utf-8") as f:
# Normalize line endings (compatible with Windows, macOS, Linux)
raw_text = f.read().replace("\r\n", "\n").replace("\r", "\n")
return raw_text
except UnicodeDecodeError as e:
raise ValueError(f"Failed to decode the file with UTF-8 encoding: {e}")
except FileNotFoundError:
raise FileNotFoundError(f"The specified file was not found: {file_path}")
# Function to split the text into chunks
def split_text(raw_text, chunk_size=100, chunk_overlap=20):
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len, # Default string length function
is_separator_regex=False, # Default separator setting
)
split_docs = text_splitter.create_documents([raw_text])
return [doc.page_content for doc in split_docs]
# Set file path and execute
file_path = "./data/the_little_prince.txt"
try:
# Read the file
raw_text = read_text_file(file_path)
# Split the text
docs = split_text(raw_text)
# Verify output
print(docs[:2]) # Print the first 5 chunks
print(f"Total number of chunks: {len(docs)}")
except Exception as e:
print(f"Error occurred: {e}")
['The Little Prince\nWritten By Antoine de Saiot-Exupery (1900γ1944)', '[ Antoine de Saiot-Exupery ]']
Total number of chunks: 1359
%%time
## text embedding
from langchain_huggingface.embeddings import HuggingFaceEmbeddings
model_name = "intfloat/multilingual-e5-large-instruct"
hf_embeddings_e5_instruct = HuggingFaceEmbeddings(
model_name=model_name,
model_kwargs={"device": device}, # mps, cuda, cpu
encode_kwargs={"normalize_embeddings": True},
)
embedded_documents = hf_embeddings_e5_instruct.embed_documents(docs)
print(len(embedded_documents))
print(len(embedded_documents[0]))
1359
1024
CPU times: user 9.33 s, sys: 3.24 s, total: 12.6 s
Wall time: 23.3 s
from uuid import uuid4
from typing import List, Tuple, Dict
def prepare_documents_with_ids(
docs: List[str], embedded_documents: List[List[float]]
) -> Tuple[List[Dict], List[str]]:
"""
Prepare a list of documents with unique IDs and their corresponding embeddings.
Parameters:
docs (List[str]): List of document texts.
embedded_documents (List[List[float]]): List of embedding vectors corresponding to the documents.
Returns:
Tuple[List[Dict], List[str]]: A tuple containing:
- List of document dictionaries with `doc_id`, `text`, and `vector`.
- List of unique document IDs (`doc_ids`).
"""
# Generate unique IDs for each document
doc_ids = [str(uuid4()) for _ in range(len(docs))]
# Prepare the document list with IDs, texts, and embeddings
documents = [
{"doc_id": doc_id, "text": doc, "vector": embedding}
for doc, doc_id, embedding in zip(docs, doc_ids, embedded_documents)
]
return documents, doc_ids
import os
# Load environment variables
ES_URL = os.environ["ES_URL"] # Elasticsearch host URL
ES_API_KEY = os.environ["ES_API_KEY"] # Elasticsearch API key
# Ensure required environment variables are set
if not ES_URL or not ES_API_KEY:
raise ValueError("Both ES_URL and ES_API_KEY must be set in environment variables.")
# create index
index_name = "langchain_tutorial_es"
# vector dimension
dims = len(embedded_documents[0])
# π οΈ Define the mapping for the new index
# This structure specifies the schema for documents stored in Elasticsearch
mapping = {
"properties": {
"metadata": {"properties": {"doc_id": {"type": "keyword"}}},
"text": {"type": "text"}, # Field for storing textual content
"vector": { # Field for storing vector embeddings
"type": "dense_vector", # Specifies dense vector type
"dims": dims, # Number of dimensions in the vector
"index": True, # Enable indexing for vector search
"similarity": "cosine", # Use cosine similarity for vector comparisons
},
}
}
This method matches documents that contain the exact keyword in their text field.
It performs a straightforward text-based search using Elasticsearch's match query.
Semantic Search
Semantic search leverages embeddings to find documents based on their contextual meaning rather than exact text matches.
It uses a pre-trained model (hf_embeddings_e5_instruct) to encode both the query and the documents into vector representations and retrieves the most similar results.
Hybrid Search
Hybrid search combines both keyword search and semantic search to provide more comprehensive results.
It uses a filtering mechanism to ensure documents meet specific keyword criteria while scoring and ranking results based on their semantic similarity to the query.
CPU times: user 863 ms, sys: 195 ms, total: 1.06 s
Wall time: 21.9 s
# keyword search
keyword = "fox"
query = {"match": {"text": keyword}}
results = es_manager.search_documents(index_name, query=query)
for idx_, result in enumerate(results):
if idx_ < 3:
print(idx_, " :", result["text"])
0 : "I am a fox," said the fox.
1 : "Good morning," said the fox.
2 : "Ah," said the fox, "I shall cry."
from langchain_elasticsearch import ElasticsearchStore
# Initialize ElasticsearchStore
vector_store = ElasticsearchStore(
index_name=index_name, # Elasticsearch index name
embedding=hf_embeddings_e5_instruct, # Object responsible for text embeddings
es_url=ES_URL, # Elasticsearch host URL
es_api_key=ES_API_KEY, # Elasticsearch API key for authentication
)
# Execute Semantic Search
search_query = "Who are the Little Princeβs friends?"
results = vector_store.similarity_search(search_query, k=3)
print("π Question: ", search_query)
print("π€ Semantic Search Results:")
for result in results:
print(f"- {result.page_content}")
π Question: Who are the Little Princeβs friends?
π€ Semantic Search Results:
- "Who are you?" said the little prince.
- "Then what?" asked the little prince.
- And the little prince asked himself:
# hybrid search with score
search_query = "Who are the Little Princeβs friends?"
keyword = "friend"
results = vector_store.similarity_search_with_score(
query=search_query,
k=1,
filter=[{"term": {"text": keyword}}],
)
print("π search_query: ", search_query)
print("π keyword: ", keyword)
for doc, score in results:
print(f"* [SIM={score:3f}] {doc.page_content}")
π search_query: Who are the Little Princeβs friends?
π keyword: friend
* [SIM=0.927641] "My friend the fox--" the little prince said to me.
It is evident that conducting a Hybrid Search significantly enhances search performance.
This approach ensures that the search results are both contextually meaningful and aligned with the specified keyword constraint, making it especially useful in scenarios where both precision and context matter.