Source code for langchain_couchbase.vectorstores.base_vector_store

"""Base vector store for Couchbase."""

from __future__ import annotations

import uuid
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    List,
    Optional,
)

from couchbase.cluster import Cluster
from couchbase.exceptions import DocumentExistsException, DocumentNotFoundException
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

if TYPE_CHECKING:
    from collections.abc import Iterable


[docs] class BaseCouchbaseVectorStore(VectorStore): """Base vector store for Couchbase. This class handles the data input and output for the vector store. This class is meant to be used as a base class for other vector stores. """ # Default batch size DEFAULT_BATCH_SIZE = 100 _metadata_key = "metadata" _default_text_key = "text" _default_embedding_key = "embedding" def _check_bucket_exists(self) -> bool: """Check if the bucket exists in the linked Couchbase cluster""" bucket_manager = self._cluster.buckets() try: bucket_manager.get_bucket(self._bucket_name) return True except Exception: return False def _check_scope_and_collection_exists(self) -> bool: """Check if the scope and collection exists in the linked Couchbase bucket Raises a ValueError if either is not found""" scope_collection_map: Dict[str, Any] = {} # Get a list of all scopes in the bucket for scope in self._bucket.collections().get_all_scopes(): scope_collection_map[scope.name] = [] # Get a list of all the collections in the scope for collection in scope.collections: scope_collection_map[scope.name].append(collection.name) # Check if the scope exists if self._scope_name not in scope_collection_map.keys(): raise ValueError( f"Scope {self._scope_name} not found in Couchbase " f"bucket {self._bucket_name}" ) # Check if the collection exists in the scope if self._collection_name not in scope_collection_map[self._scope_name]: raise ValueError( f"Collection {self._collection_name} not found in scope " f"{self._scope_name} in Couchbase bucket {self._bucket_name}" ) return True def __init__( self, cluster: Cluster, bucket_name: str, scope_name: str, collection_name: str, embedding: Embeddings, *, text_key: Optional[str] = _default_text_key, embedding_key: Optional[str] = _default_embedding_key, ) -> None: """ Initialize the Couchbase Base Vector Store for data input and output. Args: cluster (Cluster): couchbase cluster object with active connection. bucket_name (str): name of bucket to store documents in. scope_name (str): name of scope in the bucket to store documents in. collection_name (str): name of collection in the scope to store documents in embedding (Embeddings): embedding function to use. index_name (str): name of the Search index to use. text_key (optional[str]): key in document to use as text. Set to text by default. embedding_key (optional[str]): key in document to use for the embeddings. Set to embedding by default. scoped_index (optional[bool]): specify whether the index is a scoped index. Set to True by default. """ if not isinstance(cluster, Cluster): raise ValueError( f"cluster should be an instance of couchbase.Cluster, " f"got {type(cluster)}" ) self._cluster = cluster if not embedding: raise ValueError("Embeddings instance must be provided.") if not bucket_name: raise ValueError("bucket_name must be provided.") if not scope_name: raise ValueError("scope_name must be provided.") if not collection_name: raise ValueError("collection_name must be provided.") self._bucket_name = bucket_name self._scope_name = scope_name self._collection_name = collection_name self._embedding_function = embedding self._text_key = text_key self._embedding_key = embedding_key # Check if the bucket exists if not self._check_bucket_exists(): raise ValueError( f"Bucket {self._bucket_name} does not exist. " " Please create the bucket before searching." ) try: self._bucket = self._cluster.bucket(self._bucket_name) self._scope = self._bucket.scope(self._scope_name) self._collection = self._scope.collection(self._collection_name) except Exception as e: raise ValueError( "Error connecting to couchbase. " "Please check the connection and credentials." ) from e # Check if the scope and collection exists. Throws ValueError if they don't try: self._check_scope_and_collection_exists() except Exception as e: raise e
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, batch_size: Optional[int] = None, **kwargs: Any, ) -> List[str]: """Run texts through the embeddings and persist in vectorstore. If the document IDs are passed, the existing documents (if any) will be overwritten with the new ones. Args: texts (Iterable[str]): Iterable of strings to add to the vectorstore. metadatas (Optional[List[Dict]]): Optional list of metadatas associated with the texts. ids (Optional[List[str]]): Optional list of ids associated with the texts. IDs have to be unique strings across the collection. If it is not specified uuids are generated and used as ids. batch_size (Optional[int]): Optional batch size for bulk insertions. Default is 100. Returns: List[str]:List of ids from adding the texts into the vectorstore. """ if not batch_size: batch_size = self.DEFAULT_BATCH_SIZE doc_ids: List[str] = [] if ids is None: ids = [uuid.uuid4().hex for _ in texts] if metadatas is None: metadatas = [{} for _ in texts] # Check if TTL is provided ttl = kwargs.get("ttl", None) # Insert in batches for i in range(0, len(texts), batch_size): batch_texts = texts[i : i + batch_size] batch_metadatas = metadatas[i : i + batch_size] batch_ids = ids[i : i + batch_size] batch_embedded_texts = self._embedding_function.embed_documents(batch_texts) batch_docs = { id: { self._text_key: text, self._metadata_key: metadata, self._embedding_key: vector, } for id, text, metadata, vector in zip( batch_ids, batch_texts, batch_metadatas, batch_embedded_texts ) } try: # Insert with TTL if provided if ttl: result = self._collection.upsert_multi(batch_docs, expiry=ttl) else: result = self._collection.upsert_multi(batch_docs) if result.all_ok: doc_ids.extend(batch_docs.keys()) else: raise ValueError("Failed to insert documents.", result.exceptions) except DocumentExistsException as e: raise ValueError(f"Document already exists: {e}") return doc_ids
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: """Delete documents from the vector store by ids. Args: ids (List[str]): List of IDs of the documents to delete. batch_size (Optional[int]): Optional batch size for bulk deletions. Returns: bool: True if all the documents were deleted successfully, False otherwise. """ if ids is None: raise ValueError("No document ids provided to delete.") batch_size = kwargs.get("batch_size", self.DEFAULT_BATCH_SIZE) deletion_status = True # Delete in batches for i in range(0, len(ids), batch_size): batch = ids[i : i + batch_size] try: result = self._collection.remove_multi(batch) except DocumentNotFoundException as e: deletion_status = False raise ValueError(f"Document not found: {e}") deletion_status &= result.all_ok return deletion_status
@property def embeddings(self) -> Embeddings: """Return the query embedding object.""" return self._embedding_function