"""Base vector store for Couchbase."""
from __future__ import annotations
import uuid
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Optional,
)
from couchbase.cluster import Cluster
from couchbase.exceptions import DocumentExistsException, DocumentNotFoundException
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
if TYPE_CHECKING:
from collections.abc import Iterable
[docs]
class BaseCouchbaseVectorStore(VectorStore):
"""Base vector store for Couchbase.
This class handles the data input and output for the vector store.
This class is meant to be used as a base class for other vector stores.
"""
# Default batch size
DEFAULT_BATCH_SIZE = 100
_metadata_key = "metadata"
_default_text_key = "text"
_default_embedding_key = "embedding"
def _check_bucket_exists(self) -> bool:
"""Check if the bucket exists in the linked Couchbase cluster"""
bucket_manager = self._cluster.buckets()
try:
bucket_manager.get_bucket(self._bucket_name)
return True
except Exception:
return False
def _check_scope_and_collection_exists(self) -> bool:
"""Check if the scope and collection exists in the linked Couchbase bucket
Raises a ValueError if either is not found"""
scope_collection_map: Dict[str, Any] = {}
# Get a list of all scopes in the bucket
for scope in self._bucket.collections().get_all_scopes():
scope_collection_map[scope.name] = []
# Get a list of all the collections in the scope
for collection in scope.collections:
scope_collection_map[scope.name].append(collection.name)
# Check if the scope exists
if self._scope_name not in scope_collection_map.keys():
raise ValueError(
f"Scope {self._scope_name} not found in Couchbase "
f"bucket {self._bucket_name}"
)
# Check if the collection exists in the scope
if self._collection_name not in scope_collection_map[self._scope_name]:
raise ValueError(
f"Collection {self._collection_name} not found in scope "
f"{self._scope_name} in Couchbase bucket {self._bucket_name}"
)
return True
def __init__(
self,
cluster: Cluster,
bucket_name: str,
scope_name: str,
collection_name: str,
embedding: Embeddings,
*,
text_key: Optional[str] = _default_text_key,
embedding_key: Optional[str] = _default_embedding_key,
) -> None:
"""
Initialize the Couchbase Base Vector Store for data input and output.
Args:
cluster (Cluster): couchbase cluster object with active connection.
bucket_name (str): name of bucket to store documents in.
scope_name (str): name of scope in the bucket to store documents in.
collection_name (str): name of collection in the scope to store documents in
embedding (Embeddings): embedding function to use.
index_name (str): name of the Search index to use.
text_key (optional[str]): key in document to use as text.
Set to text by default.
embedding_key (optional[str]): key in document to use for the embeddings.
Set to embedding by default.
scoped_index (optional[bool]): specify whether the index is a scoped index.
Set to True by default.
"""
if not isinstance(cluster, Cluster):
raise ValueError(
f"cluster should be an instance of couchbase.Cluster, "
f"got {type(cluster)}"
)
self._cluster = cluster
if not embedding:
raise ValueError("Embeddings instance must be provided.")
if not bucket_name:
raise ValueError("bucket_name must be provided.")
if not scope_name:
raise ValueError("scope_name must be provided.")
if not collection_name:
raise ValueError("collection_name must be provided.")
self._bucket_name = bucket_name
self._scope_name = scope_name
self._collection_name = collection_name
self._embedding_function = embedding
self._text_key = text_key
self._embedding_key = embedding_key
# Check if the bucket exists
if not self._check_bucket_exists():
raise ValueError(
f"Bucket {self._bucket_name} does not exist. "
" Please create the bucket before searching."
)
try:
self._bucket = self._cluster.bucket(self._bucket_name)
self._scope = self._bucket.scope(self._scope_name)
self._collection = self._scope.collection(self._collection_name)
except Exception as e:
raise ValueError(
"Error connecting to couchbase. "
"Please check the connection and credentials."
) from e
# Check if the scope and collection exists. Throws ValueError if they don't
try:
self._check_scope_and_collection_exists()
except Exception as e:
raise e
[docs]
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
"""Run texts through the embeddings and persist in vectorstore.
If the document IDs are passed, the existing documents (if any) will be
overwritten with the new ones.
Args:
texts (Iterable[str]): Iterable of strings to add to the vectorstore.
metadatas (Optional[List[Dict]]): Optional list of metadatas associated
with the texts.
ids (Optional[List[str]]): Optional list of ids associated with the texts.
IDs have to be unique strings across the collection.
If it is not specified uuids are generated and used as ids.
batch_size (Optional[int]): Optional batch size for bulk insertions.
Default is 100.
Returns:
List[str]:List of ids from adding the texts into the vectorstore.
"""
if not batch_size:
batch_size = self.DEFAULT_BATCH_SIZE
doc_ids: List[str] = []
if ids is None:
ids = [uuid.uuid4().hex for _ in texts]
if metadatas is None:
metadatas = [{} for _ in texts]
# Check if TTL is provided
ttl = kwargs.get("ttl", None)
# Insert in batches
for i in range(0, len(texts), batch_size):
batch_texts = texts[i : i + batch_size]
batch_metadatas = metadatas[i : i + batch_size]
batch_ids = ids[i : i + batch_size]
batch_embedded_texts = self._embedding_function.embed_documents(batch_texts)
batch_docs = {
id: {
self._text_key: text,
self._metadata_key: metadata,
self._embedding_key: vector,
}
for id, text, metadata, vector in zip(
batch_ids, batch_texts, batch_metadatas, batch_embedded_texts
)
}
try:
# Insert with TTL if provided
if ttl:
result = self._collection.upsert_multi(batch_docs, expiry=ttl)
else:
result = self._collection.upsert_multi(batch_docs)
if result.all_ok:
doc_ids.extend(batch_docs.keys())
else:
raise ValueError("Failed to insert documents.", result.exceptions)
except DocumentExistsException as e:
raise ValueError(f"Document already exists: {e}")
return doc_ids
[docs]
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
"""Delete documents from the vector store by ids.
Args:
ids (List[str]): List of IDs of the documents to delete.
batch_size (Optional[int]): Optional batch size for bulk deletions.
Returns:
bool: True if all the documents were deleted successfully, False otherwise.
"""
if ids is None:
raise ValueError("No document ids provided to delete.")
batch_size = kwargs.get("batch_size", self.DEFAULT_BATCH_SIZE)
deletion_status = True
# Delete in batches
for i in range(0, len(ids), batch_size):
batch = ids[i : i + batch_size]
try:
result = self._collection.remove_multi(batch)
except DocumentNotFoundException as e:
deletion_status = False
raise ValueError(f"Document not found: {e}")
deletion_status &= result.all_ok
return deletion_status
@property
def embeddings(self) -> Embeddings:
"""Return the query embedding object."""
return self._embedding_function