Foundation Model API Embeddings for Vector Search#
Databricks Vector Search is a vector database built into Databricks that offers straightforward integration with the Databricks Foundation Model API embedding models.
This notebook walks through the process of setting up a vector index and configuring it to automatically use an embedding model from the Foundation Model API to generate embeddings. It will then show how load some text data into the vector database and how to query the database.
To learn more about how Databricks Vector Search works, see the documentation here.
Setup#
First, we will install the necessary libraries and set up a temporary catalog/schema/table for this example. If you do not have permissions to create catalogs, you can specify a catalog manually.
%pip install --upgrade --force-reinstall databricks-vectorsearch databricks-genai-inference
dbutils.library.restartPython()
import re
from datetime import datetime
import uuid
# Assuming DB_USER is fetched from the Databricks utility
DB_USER = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
# Sanitize the DB_USER by replacing non-alphanumeric characters with underscores
DB_USER_SANITIZED = re.sub(r'\W', '_', DB_USER)
# Append a timestamp and a UUID to ensure uniqueness
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = str(uuid.uuid4()).split('-')[0] # Get the first part of the UUID
# Create a definitely unique DB_USER identifier
DB_USER_UNIQUE = f"{DB_USER_SANITIZED}_{timestamp}_{unique_id}"
print(DB_USER_UNIQUE)
daniel_liden_databricks_com_20240207_152232_7b118968
Define temporary catalog, table, endpoint, and index names#
CATALOG = DB_USER_UNIQUE
DB = "FM_API_EXAMPLES"
VS_ENDPOINT_NAME = "test_endpoint"
VS_INDEX_NAME = "FM_API_EXAMPLES_VS_INDEX"
SOURCE_TABLE_NAME = "FM_API_EXAMPLES_DATA"
VS_INDEX_FULLNAME = f"{CATALOG}.{DB}.{VS_INDEX_NAME}"
SOURCE_TABLE_FULLNAME = f"{CATALOG}.{DB}.{SOURCE_TABLE_NAME}"
DATABRICKS_TOKEN = dbutils.secrets.get(scope="daniel.liden", key="rag_demo")
Create Catalog, Schema, and Table#
A Databricks Vector Search Index is created from a Delta Table. The source Delta Table includes the data we ultimately want to index and search with the vector database. In this cell, we create the catalog, schema, and source table from which we will create the vector database.
# Set up schema/volume/table
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{DB}")
spark.sql(
f"""CREATE TABLE IF NOT EXISTS {SOURCE_TABLE_FULLNAME} (
id STRING,
text STRING,
date DATE,
title STRING
)
USING delta
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
"""
)
DataFrame[]
Set up the Vector Database#
Next, we set up the vector database. There are three key steps:
Initialize the vector search client
Create the endpoint
Create the index using the source Delta table we created earlier and the
bge-large-en
embeddings model from the Foundation Model API
Initialize the Vector Search Client#
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient()
[NOTICE] Using a notebook authentication token. Recommended for development only. For improved performance, please use Service Principal based authentication. To disable this message, pass disable_notice=True to VectorSearchClient().
Create the Endpoint#
The cell below will check if the endpoint already exists and create it if it does not.
if not VS_ENDPOINT_NAME in [endpoint["name"] for endpoint in vsc.list_endpoints()['endpoints']]:
vsc.create_endpoint(VS_ENDPOINT_NAME)
else:
print("Endpoint already exists.")
Endpoint already exists.
Create the Vector Index#
Now we can create the index over the Delta table we created earlier.
# set up an index with managed embeddings
i=vsc.create_delta_sync_index(
endpoint_name=VS_ENDPOINT_NAME,
index_name=VS_INDEX_FULLNAME,
source_table_name=SOURCE_TABLE_FULLNAME,
pipeline_type="TRIGGERED",
primary_key="id",
embedding_source_column="text",
embedding_model_endpoint_name="databricks-bge-large-en"
)
There are a few key points to note about the specific configuration we used in this case:
We used
pipeline_type="TRIGGERED"
. This requires us to use the index’ssync()
method to manually sync the source Delta table with the index. We could, alternatively, usepipeline_type="CONTINUOUS"
which will automatically keep the index in sync with the source table with only seconds of latency. This approach is more costly, though, as a compute cluster must be provisioned for the continuous sync streaming pipeline.We specified
embedding_model_endpoint_name="databricks-bge-large-en"
. We can use any embedding model available via model serving; this is the name of the pay-per-token Foundation Model API version ofdatabricks-bge-large-en
. By passing anembedding_source_column
andembedding_model_endpoint_name
, we configure the index such that it will automatically use the model to generate embeddings for the texts in thetext
column of the source table. We do not need to manually generate embeddings.If, however, we did want to manage embeddings manually, we could include the following arguments instead:
embedding_vector_column="<embedding_column>", embedding_dimension=<embedding_dimension>
In the latter approach, we include a column for embeddings in the source delta table and embeddings are not computed automatically from the text column.
Set up some example texts#
Now we set up some example texts to index.
# Some example texts
from datetime import datetime
smarter_overview = {"text":"""
S.M.A.R.T.E.R. Initiative: Strategic Management for Achieving Results through Efficiency and Resources
Introduction
The S.M.A.R.T.E.R. Initiative, standing for "Strategic Management for Achieving Results through Efficiency and Resources," is a groundbreaking project aimed at revolutionizing the way our organization operates. In today's rapidly changing business landscape, achieving success demands a strategic approach that leverages resources effectively while optimizing efficiency. The S.M.A.R.T.E.R. Initiative is designed to do just that.
Background
As markets evolve and competition intensifies, organizations must adapt to stay relevant and profitable. Traditional methods of operation often become inefficient and costly. The S.M.A.R.T.E.R. Initiative was conceived as a response to this challenge, with the primary goal of enhancing strategic management practices to achieve better results.
Objectives
1. Resource Optimization
One of the key objectives of the S.M.A.R.T.E.R. Initiative is to optimize resource allocation. This involves identifying underutilized resources, streamlining processes, and reallocating resources to areas that contribute most to our strategic goals.
2. Efficiency Improvement
Efficiency is at the core of the S.M.A.R.T.E.R. Initiative. By identifying bottlenecks and improving processes, we aim to reduce operational costs, shorten project timelines, and enhance overall productivity.
3. Strategic Alignment
For any organization to succeed, its activities must be aligned with its strategic objectives. The S.M.A.R.T.E.R. Initiative will ensure that every action and resource allocation is in sync with our long-term strategic goals.
4. Results-driven Approach
The ultimate measure of success is results. The S.M.A.R.T.E.R. Initiative will foster a results-driven culture within our organization, where decisions and actions are guided by their impact on our bottom line and strategic objectives.
Key Components
The S.M.A.R.T.E.R. Initiative comprises several key components:
1. Data Analytics and Insights
Data is the foundation of informed decision-making. We will invest in advanced data analytics tools to gain insights into our operations, customer behavior, and market trends. These insights will guide our resource allocation and strategy.
2. Process Automation
Automation will play a vital role in enhancing efficiency. Routine and repetitive tasks will be automated, freeing up our workforce to focus on more strategic activities.
3. Performance Metrics and KPIs
To ensure that our efforts are aligned with our objectives, we will establish a comprehensive set of Key Performance Indicators (KPIs). Regular monitoring and reporting will provide visibility into our progress.
4. Training and Development
Enhancing our workforce's skills is essential. We will invest in training and development programs to equip our employees with the knowledge and tools needed to excel in their roles.
Implementation Timeline
The S.M.A.R.T.E.R. Initiative will be implemented in phases over the next three years. This phased approach allows for a smooth transition and ensures that each component is integrated effectively into our operations.
Conclusion
The S.M.A.R.T.E.R. Initiative represents a significant step forward for our organization. By strategically managing our resources and optimizing efficiency, we are positioning ourselves for sustained success in a competitive marketplace. This initiative is a testament to our commitment to excellence and our dedication to achieving exceptional results.
As we embark on this journey, we look forward to the transformative impact that the S.M.A.R.T.E.R. Initiative will have on our organization and the benefits it will bring to our employees, customers, and stakeholders.
""", "title": "Project Kickoff", "date": datetime.strptime("2024-01-16", "%Y-%m-%d")}
smarter_kpis = {"text": """S.M.A.R.T.E.R. Initiative: Key Performance Indicators (KPIs)
Introduction
The S.M.A.R.T.E.R. Initiative (Strategic Management for Achieving Results through Efficiency and Resources) is designed to drive excellence within our organization. To measure the success and effectiveness of this initiative, we have established three concrete and measurable Key Performance Indicators (KPIs). This document outlines these KPIs and their associated targets.
Key Performance Indicators (KPIs)
1. Resource Utilization Efficiency (RUE)
Objective: To optimize resource utilization for cost-efficiency.
KPI Definition: RUE will be calculated as (Actual Resource Utilization / Planned Resource Utilization) * 100%.
Target: Achieve a 15% increase in RUE within the first year.
2. Time-to-Decision Reduction (TDR)
Objective: To streamline operational processes and reduce decision-making time.
KPI Definition: TDR will be calculated as (Pre-Initiative Decision Time - Post-Initiative Decision Time) / Pre-Initiative Decision Time.
Target: Achieve a 20% reduction in TDR for critical business decisions.
3. Strategic Goals Achievement (SGA)
Objective: To ensure that organizational activities align with strategic goals.
KPI Definition: SGA will measure the percentage of predefined strategic objectives achieved.
Target: Achieve an 80% Strategic Goals Achievement rate within two years.
Conclusion
These three KPIs, Resource Utilization Efficiency (RUE), Time-to-Decision Reduction (TDR), and Strategic Goals Achievement (SGA), will serve as crucial metrics for evaluating the success of the S.M.A.R.T.E.R. Initiative. By tracking these KPIs and working towards their targets, we aim to drive efficiency, optimize resource utilization, and align our actions with our strategic objectives. This focus on measurable outcomes will guide our efforts towards achieving excellence within our organization.""",
"title": "Project KPIs", "date": datetime.strptime("2024-01-16", "%Y-%m-%d")}
Chunk the texts#
Typically, when using a vector database for retrieval-augmented generation (RAG) tasks, we break the texts apart into smaller (and sometimes overlapping) chunks in order to return focused and relevant information without returning an excessive amount of text.
In the code below, we break the sample texts above into shorter overlapping text chunks.
import re
def chunk_text(text, chunk_size, overlap):
words = text.split()
chunks = []
index = 0
while index < len(words):
end = index + chunk_size
while end < len(words) and not re.match(r'.*[.!?]\s*$', words[end]):
end += 1
chunk = ' '.join(words[index:end+1])
chunks.append(chunk)
index += chunk_size - overlap
return chunks
chunks = []
documents = [smarter_overview, smarter_kpis]
for document in documents:
for i, c in enumerate(chunk_text(document["text"], 150, 25)):
chunk = {}
chunk["text"] = c
chunk["title"] = document["title"]
chunk["date"] = document["date"]
chunk["id"] = document["title"] + "_" + str(i)
chunks.append(chunk)
Insert the text chunks into the source delta table#
Now we save the chunks, along with some metadata (a document title, date, and a unique id) to the source delta table.
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, DateType
schema = StructType(
[
StructField("id", StringType(), True),
StructField("text", StringType(), True),
StructField("title", StringType(), True),
StructField("date", DateType(), True),
]
)
if chunks:
result_df = spark.createDataFrame(chunks, schema=schema)
result_df.write.format("delta").mode("append").saveAsTable(
SOURCE_TABLE_FULLNAME
)
Sync the Vector Search Index#
Because we specified pipeline_type="TRIGGERED"
when configuring the Delta Index, we still need to manually tell the index to sync with the delta table. This will take a few minutes.
# Sync
index = vsc.get_index(endpoint_name=VS_ENDPOINT_NAME,
index_name=VS_INDEX_FULLNAME)
index.sync()
{}
Query the Vector Index#
Now that we have added our text chunks to the source delta table and synced it with the Vector Search index, we’re ready to query the index! We do this with the index.similarity_search()
method.
The columns
argument takes a list of the columns we want returned; in this case, we request the text and title columns.
# query
index.similarity_search(columns=["text", "title"],
query_text="What is the TDR Target for the SMARTER initiative?",
num_results = 3)
{'manifest': {'column_count': 3,
'columns': [{'name': 'text'}, {'name': 'title'}, {'name': 'score'}]},
'result': {'row_count': 3,
'data_array': [['S.M.A.R.T.E.R. Initiative: Key Performance Indicators (KPIs) Introduction The S.M.A.R.T.E.R. Initiative (Strategic Management for Achieving Results through Efficiency and Resources) is designed to drive excellence within our organization. To measure the success and effectiveness of this initiative, we have established three concrete and measurable Key Performance Indicators (KPIs). This document outlines these KPIs and their associated targets. Key Performance Indicators (KPIs) 1. Resource Utilization Efficiency (RUE) Objective: To optimize resource utilization for cost-efficiency. KPI Definition: RUE will be calculated as (Actual Resource Utilization / Planned Resource Utilization) * 100%. Target: Achieve a 15% increase in RUE within the first year. 2. Time-to-Decision Reduction (TDR) Objective: To streamline operational processes and reduce decision-making time. KPI Definition: TDR will be calculated as (Pre-Initiative Decision Time - Post-Initiative Decision Time) / Pre-Initiative Decision Time. Target: Achieve a 20% reduction in TDR for critical business decisions. 3. Strategic Goals Achievement (SGA) Objective: To ensure that organizational activities align with strategic goals.',
'Project KPIs',
0.56844866],
['Time) / Pre-Initiative Decision Time. Target: Achieve a 20% reduction in TDR for critical business decisions. 3. Strategic Goals Achievement (SGA) Objective: To ensure that organizational activities align with strategic goals. KPI Definition: SGA will measure the percentage of predefined strategic objectives achieved. Target: Achieve an 80% Strategic Goals Achievement rate within two years. Conclusion These three KPIs, Resource Utilization Efficiency (RUE), Time-to-Decision Reduction (TDR), and Strategic Goals Achievement (SGA), will serve as crucial metrics for evaluating the success of the S.M.A.R.T.E.R. Initiative. By tracking these KPIs and working towards their targets, we aim to drive efficiency, optimize resource utilization, and align our actions with our strategic objectives. This focus on measurable outcomes will guide our efforts towards achieving excellence within our organization.',
'Project KPIs',
0.5681397],
['S.M.A.R.T.E.R. Initiative is to optimize resource allocation. This involves identifying underutilized resources, streamlining processes, and reallocating resources to areas that contribute most to our strategic goals. 2. Efficiency Improvement Efficiency is at the core of the S.M.A.R.T.E.R. Initiative. By identifying bottlenecks and improving processes, we aim to reduce operational costs, shorten project timelines, and enhance overall productivity. 3. Strategic Alignment For any organization to succeed, its activities must be aligned with its strategic objectives. The S.M.A.R.T.E.R. Initiative will ensure that every action and resource allocation is in sync with our long-term strategic goals. 4. Results-driven Approach The ultimate measure of success is results. The S.M.A.R.T.E.R. Initiative will foster a results-driven culture within our organization, where decisions and actions are guided by their impact on our bottom line and strategic objectives. Key Components The S.M.A.R.T.E.R. Initiative comprises several key components: 1. Data Analytics and Insights Data is the foundation of informed decision-making.',
'Project Kickoff',
0.55467147]]},
'next_page_token': '',
'debug_info': {'response_time': 897.0,
'ann_time': 179.0,
'embedding_gen_time': 534.0}}
Using the UI#
Most of the Vector Database management steps above can be done via the UI: you can create an endpoint, create an index, sync the index, and more via the UI in the Databricks Catalog Explorer.
Cleanup#
The code snippet below will delete the VS index and source table, along with the catalog and schema if they are empty. Uncomment and run these cells if you are finished with the example and want to delete the generated artifacts.
Please carefully inspect the objects you used in this guide to ensure you are not deleting any shared resources or anything else you do not want deleted.
## delete index
# vsc.delete_index(endpoint_name=VS_ENDPOINT_NAME,
# index_name=VS_INDEX_FULLNAME)
# delete schema and catalog
# spark.sql(f"DROP CATALOG IF EXISTS {CATALOG} CASCADE")
# # Delete Endpoint
# vsc.delete_endpoint(VS_ENDPOINT_NAME)