Skip to content

CouchbaseSearchDocumentStore¤

CouchbaseSearchDocumentStore is a DocumentStore implementation that uses Couchbase capella service that is easy to deploy, operate, and scale.

The document store supports both scope-level and global-level vector search indexes:

  • Scope-level indexes (default): The vector search index is created at the scope level and only searches documents within that scope
  • Global-level indexes: The vector search index is created at the bucket level and can search across all scopes and collections in the bucket

The index level is specified using the is_global_level_index parameter during initialization.

Source code in src/couchbase_haystack/document_stores/document_store.py
class CouchbaseSearchDocumentStore:
    """CouchbaseSearchDocumentStore is a DocumentStore implementation that uses
    [Couchbase capella](https://cloud.couchbase.com) service that is easy to deploy, operate, and scale.

    The document store supports both scope-level and global-level vector search indexes:

    - Scope-level indexes (default): The vector search index is created at the scope level and only searches
      documents within that scope
    - Global-level indexes: The vector search index is created at the bucket level and can search across all
      scopes and collections in the bucket

    The index level is specified using the `is_global_level_index` parameter during initialization.
    """

    def __init__(
        self,
        *,
        cluster_connection_string: Secret = Secret.from_env_var("CB_CONNECTION_STRING"),
        authenticator: Union[CouchbasePasswordAuthenticator, CouchbaseCertificateAuthenticator],
        cluster_options: CouchbaseClusterOptions = CouchbaseClusterOptions(),
        bucket: str,
        scope: str,
        collection: str,
        vector_search_index: str,
        is_global_level_index: bool = False,
        **kwargs: Dict[str, Any],
    ):
        """Creates a new CouchbaseSearchDocumentStore instance.

        Parameters:
            cluster_connection_string:
                Connection string for the Couchbase cluster

            authenticator:
                Authentication method (password or certificate based)

            cluster_options:
                Options for configuring the cluster connection

            bucket:
                Name of the Couchbase bucket to use

            scope:
                Name of the scope within the bucket

            collection:
                Name of the collection within the scope

            vector_search_index:
                Name of the vector search index to use

            is_global_level_index:
                If True, uses a global (bucket-level) vector search index that can search across all
                scopes and collections. If False (default), uses a scope-level index that only searches
                within the specified scope.

            kwargs:
                Additional keyword arguments passed to the Cluster constructor

        Raises:
            ValueError:
                If the collection name contains invalid characters.
        """
        if collection and not bool(re.match(r"^[a-zA-Z0-9\-_]+$", collection)):
            msg = f'Invalid collection name: "{collection}". It can only contain letters, numbers, -, or _.'
            raise ValueError(msg)

        self.cluster_connection_string = cluster_connection_string
        self.authenticator = authenticator
        self.cluster_options = cluster_options
        self.bucket = bucket
        self.scope_name = scope
        self.collection_name = collection
        self.vector_search_index = vector_search_index
        self.is_global_level_index = is_global_level_index
        self._connection: Optional[Cluster] = None
        self._scope: Optional[Scope] = None
        self._collection: Optional[Collection] = None
        self._kwargs = kwargs

    @property
    def connection(self) -> Cluster:
        if self._connection is None:
            cluster_options = self.cluster_options.get_cluster_options(self.authenticator.get_cb_auth())
            if self.cluster_options.get("profile") is not None:
                cluster_options.apply_profile(self.cluster_options["profile"])
            self._connection = Cluster(
                self.cluster_connection_string.resolve_value(),
                cluster_options,
                **self._kwargs,
            )
            self._connection.wait_until_ready(timeout=timedelta(seconds=60))
        return self._connection

    @property
    def scope(self) -> Scope:
        if self._scope is None:
            bucket = self.connection.bucket(self.bucket)
            scopes_specs = bucket.collections().get_all_scopes()
            scope_found = False
            collection_found = False
            for scope_spec in scopes_specs:
                if scope_spec.name == self.scope_name:
                    scope_found = True
                    for col_spec in scope_spec.collections:
                        if col_spec.name == self.collection_name:
                            collection_found = True
            if not scope_found:
                msg = f"Scope '{self.scope_name}' does not exist in bucket '{self.bucket}'."
                raise ValueError(msg)
            if not collection_found:
                msg = f"Collection '{self.collection_name}' does not exist in scope '{self.scope_name}'."
                raise ValueError(msg)
            self._scope = self.connection.bucket(self.bucket).scope(self.scope_name)
        return self._scope

    @property
    def collection(self) -> Collection:
        if self._collection is None:
            self._collection = self.scope.collection(self.collection_name)
        return self._collection

    def to_dict(self) -> Dict[str, Any]:
        """Serializes the component to a dictionary.

        Returns:
            Dictionary with serialized data.
        """
        return default_to_dict(
            self,
            cluster_connection_string=self.cluster_connection_string.to_dict(),
            authenticator=self.authenticator.to_dict(),
            cluster_options=self.cluster_options.to_dict(),
            bucket=self.bucket,
            scope=self.scope_name,
            collection=self.collection_name,
            vector_search_index=self.vector_search_index,
            is_global_level_index=self.is_global_level_index,
            **self._kwargs,
        )

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "CouchbaseSearchDocumentStore":
        """Deserializes the component from a dictionary.

        Args:
            data:
                Dictionary to deserialize from.

        Returns:
            Deserialized component.
        """
        if data["init_parameters"]["authenticator"]["type"] == generate_qualified_class_name(CouchbasePasswordAuthenticator):
            data["init_parameters"]["authenticator"] = CouchbasePasswordAuthenticator.from_dict(
                data["init_parameters"]["authenticator"]
            )
        else:
            data["init_parameters"]["authenticator"] = CouchbaseCertificateAuthenticator.from_dict(
                data["init_parameters"]["authenticator"]
            )
        data["init_parameters"]["cluster_options"] = CouchbaseClusterOptions.from_dict(data["init_parameters"]["cluster_options"])
        deserialize_secrets_inplace(data["init_parameters"], keys=["cluster_connection_string"])
        return default_from_dict(cls, data)

    def _get_search_interface(self):
        """Returns the appropriate search interface based on the index level configuration.

        Returns:
            Either scope.search_indexes() for scope-level or connection.search_indexes() for global-level
        """
        if not self.is_global_level_index:
            return self.scope.search_indexes()
        return self.connection.search_indexes()

    def count_documents(self) -> int:
        """Returns how many documents are present in the document store.

        Returns:
            The number of documents in the document store.
        """
        search_interface = self._get_search_interface()
        return search_interface.get_indexed_documents_count(self.vector_search_index)

    def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
        """Returns the documents that match the filters provided.

        For a detailed specification of the filters,
        refer to the Haystack [documentation](https://docs.haystack.deepset.ai/v2.0/docs/metadata-filtering).

        Args:
            filters: The filters to apply. It returns only the documents that match the filters.

        Returns:
            A list of Documents that match the given filters.
        """
        search_filters: SearchQuery
        if filters:
            search_filters = _normalize_filters(filters)
        else:
            search_filters = search.MatchAllQuery()
        logger.debug(search_filters.encodable)
        request = search.SearchRequest(search_filters)
        options = SearchOptions(fields=["*"], limit=10000)

        if not self.is_global_level_index:
            response = self.scope.search(self.vector_search_index, request, options)
        else:
            response = self.connection.search(self.vector_search_index, request, options)

        return self.__get_doc_from_kv(response)

    def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
        """Writes documents into the couchbase collection.

        Args:
            documents: A list of Documents to write to the document store.
            policy: The duplicate policy to use when writing documents.

        Raises:
            DuplicateDocumentError: If a document with the same ID already exists in the document store
                and the policy is set to DuplicatePolicy.FAIL (or not specified).
            ValueError: If the documents are not of type Document.

        Returns:
            The number of documents written to the document store.
        """
        if len(documents) > 0:
            if not isinstance(documents[0], Document):
                msg = "param 'documents' must contain a list of objects of type Document"
                raise ValueError(msg)

        if policy == DuplicatePolicy.NONE:
            policy = DuplicatePolicy.FAIL

        cb_documents = []
        for doc in documents:
            doc_dict = doc.to_dict(flatten=False)
            doc_dict = {k: v for k, v in doc_dict.items() if v is not None}
            if "sparse_embedding" in doc_dict:
                sparse_embedding = doc_dict.pop("sparse_embedding", None)
                if sparse_embedding:
                    logger.warning(
                        "Document %s has the `sparse_embedding` field set,"
                        "but storing sparse embeddings in Couchbase is not currently supported."
                        "The `sparse_embedding` field will be ignored.",
                        doc.id,
                    )
            cb_documents.append(doc_dict)
        written_docs = len(documents)

        operations = {doc["id"]: doc for doc in cb_documents}
        try:
            result: MultiMutationResult
            if policy == DuplicatePolicy.FAIL:
                result = self.collection.insert_multi(operations)
            else:
                result = self.collection.upsert_multi(operations)
        except Exception as e:
            logger.error("write error {e}")
            msg = f"Failed to write documents to Couchbase. Error: {e}"
            raise DocumentStoreError(msg) from e
        if not result.all_ok and result.exceptions:
            duplicate_ids = []
            other_errors = []
            for id, ex in result.exceptions.items():
                if isinstance(ex, DocumentExistsException):
                    duplicate_ids.append(id)
                else:
                    other_errors.append({"id": id, "exception": ex})
            if len(duplicate_ids) > 0:
                msg = f"IDs '{', '.join(duplicate_ids)}' already exist in the document store."
                raise DuplicateDocumentError(msg)
            if len(other_errors) > 0:
                msg = f"Failed to write documents to couchbase. Errors:\n{other_errors}"
                raise DocumentStoreError(msg)
        logger.debug("date written")
        return written_docs

    def delete_documents(self, document_ids: List[str]) -> None:
        """Deletes all documents with a matching document_ids from the document store.

        Args:
            document_ids: the document ids to delete
        """
        if not document_ids:
            return
        self.collection.remove_multi(keys=document_ids)

    def _embedding_retrieval(
        self,
        query_embedding: List[float],
        top_k: int = 10,
        search_query: SearchQuery = None,
        limit: Optional[int] = None,
    ) -> List[Document]:
        """Find the documents that are most similar to the provided `query_embedding` by using a vector similarity metric.

        Args:
            query_embedding: Embedding of the query
            top_k: How many documents to be returned by the vector query
            search_query: Search filters param which is parsed to the Couchbase search query. The vector query and
                search query are ORed operation.
            limit: Maximum number of Documents to return. Defaults to top_k if not specified.

        Returns:
            A list of Documents that are most similar to the given `query_embedding`

        Raises:
            ValueError: If `query_embedding` is empty
            DocumentStoreError: If the retrieval of documents from Couchbase fails
        """
        if not query_embedding:
            msg = "Query embedding must not be empty"
            raise ValueError(msg)

        vector_search = VectorSearch.from_vector_query(
            VectorQuery(field_name="embedding", vector=query_embedding, num_candidates=top_k)
        )
        request = search.SearchRequest.create(vector_search)
        if search_query:
            request.with_search_query(search_query)

        if limit is None:
            limit = top_k
        options = SearchOptions(fields=["*"], limit=limit)

        if not self.is_global_level_index:
            response = self.scope.search(self.vector_search_index, request, options)
        else:
            response = self.connection.search(self.vector_search_index, request, options)

        return self.__get_doc_from_kv(response)

    def __get_doc_from_kv(self, response: SearchResult) -> List[Document]:
        """Gets documents from Couchbase KV store based on search results.

        Args:
            response: Search results from Couchbase

        Returns:
            List of Document objects

        Raises:
            DocumentStoreError: If retrieving documents from Couchbase fails
        """
        documents: List[Document] = []
        ids: List[str] = []
        scores: List[float] = []
        for doc in response.rows():
            ids.append(doc.id)
            scores.append(doc.score)
        kv_response = self.collection.get_multi(keys=ids)
        if not kv_response.all_ok and kv_response.exceptions:
            errors = []
            for id, ex in kv_response.exceptions.items():
                errors.append({"id": id, "exception": ex})
            if len(errors) > 0:
                msg = f"Failed to write documents to couchbase. Errors:\n{errors}"
                raise DocumentStoreError(msg)
        for i, id in enumerate(ids):
            get_result = kv_response.results.get(id)
            if get_result is not None and get_result.success:
                value = get_result.value
                value["id"] = id
                value["score"] = scores[i]
            documents.append(Document.from_dict(value))
        return documents

__init__ ¤

__init__(
    *,
    cluster_connection_string: Secret = Secret.from_env_var(
        "CB_CONNECTION_STRING"
    ),
    authenticator: Union[
        CouchbasePasswordAuthenticator, CouchbaseCertificateAuthenticator
    ],
    cluster_options: CouchbaseClusterOptions = CouchbaseClusterOptions(),
    bucket: str,
    scope: str,
    collection: str,
    vector_search_index: str,
    is_global_level_index: bool = False,
    **kwargs: Dict[str, Any]
)

Parameters:

  • cluster_connection_string (Secret, default: from_env_var('CB_CONNECTION_STRING') ) –

    Connection string for the Couchbase cluster

  • authenticator (Union[CouchbasePasswordAuthenticator, CouchbaseCertificateAuthenticator]) –

    Authentication method (password or certificate based)

  • cluster_options (CouchbaseClusterOptions, default: CouchbaseClusterOptions() ) –

    Options for configuring the cluster connection

  • bucket (str) –

    Name of the Couchbase bucket to use

  • scope (str) –

    Name of the scope within the bucket

  • collection (str) –

    Name of the collection within the scope

  • vector_search_index (str) –

    Name of the vector search index to use

  • is_global_level_index (bool, default: False ) –

    If True, uses a global (bucket-level) vector search index that can search across all scopes and collections. If False (default), uses a scope-level index that only searches within the specified scope.

  • kwargs (Dict[str, Any], default: {} ) –

    Additional keyword arguments passed to the Cluster constructor

Raises:

  • ValueError

    If the collection name contains invalid characters.

Source code in src/couchbase_haystack/document_stores/document_store.py
def __init__(
    self,
    *,
    cluster_connection_string: Secret = Secret.from_env_var("CB_CONNECTION_STRING"),
    authenticator: Union[CouchbasePasswordAuthenticator, CouchbaseCertificateAuthenticator],
    cluster_options: CouchbaseClusterOptions = CouchbaseClusterOptions(),
    bucket: str,
    scope: str,
    collection: str,
    vector_search_index: str,
    is_global_level_index: bool = False,
    **kwargs: Dict[str, Any],
):
    """Creates a new CouchbaseSearchDocumentStore instance.

    Parameters:
        cluster_connection_string:
            Connection string for the Couchbase cluster

        authenticator:
            Authentication method (password or certificate based)

        cluster_options:
            Options for configuring the cluster connection

        bucket:
            Name of the Couchbase bucket to use

        scope:
            Name of the scope within the bucket

        collection:
            Name of the collection within the scope

        vector_search_index:
            Name of the vector search index to use

        is_global_level_index:
            If True, uses a global (bucket-level) vector search index that can search across all
            scopes and collections. If False (default), uses a scope-level index that only searches
            within the specified scope.

        kwargs:
            Additional keyword arguments passed to the Cluster constructor

    Raises:
        ValueError:
            If the collection name contains invalid characters.
    """
    if collection and not bool(re.match(r"^[a-zA-Z0-9\-_]+$", collection)):
        msg = f'Invalid collection name: "{collection}". It can only contain letters, numbers, -, or _.'
        raise ValueError(msg)

    self.cluster_connection_string = cluster_connection_string
    self.authenticator = authenticator
    self.cluster_options = cluster_options
    self.bucket = bucket
    self.scope_name = scope
    self.collection_name = collection
    self.vector_search_index = vector_search_index
    self.is_global_level_index = is_global_level_index
    self._connection: Optional[Cluster] = None
    self._scope: Optional[Scope] = None
    self._collection: Optional[Collection] = None
    self._kwargs = kwargs

to_dict ¤

to_dict() -> Dict[str, Any]

Serializes the component to a dictionary.

Returns:

  • Dict[str, Any]

    Dictionary with serialized data.

Source code in src/couchbase_haystack/document_stores/document_store.py
def to_dict(self) -> Dict[str, Any]:
    """Serializes the component to a dictionary.

    Returns:
        Dictionary with serialized data.
    """
    return default_to_dict(
        self,
        cluster_connection_string=self.cluster_connection_string.to_dict(),
        authenticator=self.authenticator.to_dict(),
        cluster_options=self.cluster_options.to_dict(),
        bucket=self.bucket,
        scope=self.scope_name,
        collection=self.collection_name,
        vector_search_index=self.vector_search_index,
        is_global_level_index=self.is_global_level_index,
        **self._kwargs,
    )

from_dict classmethod ¤

from_dict(data: Dict[str, Any]) -> CouchbaseSearchDocumentStore

Deserializes the component from a dictionary.

Parameters:

  • data (Dict[str, Any]) –

    Dictionary to deserialize from.

Returns:

Source code in src/couchbase_haystack/document_stores/document_store.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "CouchbaseSearchDocumentStore":
    """Deserializes the component from a dictionary.

    Args:
        data:
            Dictionary to deserialize from.

    Returns:
        Deserialized component.
    """
    if data["init_parameters"]["authenticator"]["type"] == generate_qualified_class_name(CouchbasePasswordAuthenticator):
        data["init_parameters"]["authenticator"] = CouchbasePasswordAuthenticator.from_dict(
            data["init_parameters"]["authenticator"]
        )
    else:
        data["init_parameters"]["authenticator"] = CouchbaseCertificateAuthenticator.from_dict(
            data["init_parameters"]["authenticator"]
        )
    data["init_parameters"]["cluster_options"] = CouchbaseClusterOptions.from_dict(data["init_parameters"]["cluster_options"])
    deserialize_secrets_inplace(data["init_parameters"], keys=["cluster_connection_string"])
    return default_from_dict(cls, data)

_get_search_interface ¤

_get_search_interface()

Returns the appropriate search interface based on the index level configuration.

Returns:

  • Either scope.search_indexes() for scope-level or connection.search_indexes() for global-level

Source code in src/couchbase_haystack/document_stores/document_store.py
def _get_search_interface(self):
    """Returns the appropriate search interface based on the index level configuration.

    Returns:
        Either scope.search_indexes() for scope-level or connection.search_indexes() for global-level
    """
    if not self.is_global_level_index:
        return self.scope.search_indexes()
    return self.connection.search_indexes()

count_documents ¤

count_documents() -> int

Returns how many documents are present in the document store.

Returns:

  • int

    The number of documents in the document store.

Source code in src/couchbase_haystack/document_stores/document_store.py
def count_documents(self) -> int:
    """Returns how many documents are present in the document store.

    Returns:
        The number of documents in the document store.
    """
    search_interface = self._get_search_interface()
    return search_interface.get_indexed_documents_count(self.vector_search_index)

filter_documents ¤

filter_documents(filters: Optional[Dict[str, Any]] = None) -> List[Document]

Returns the documents that match the filters provided.

For a detailed specification of the filters, refer to the Haystack documentation.

Parameters:

  • filters (Optional[Dict[str, Any]], default: None ) –

    The filters to apply. It returns only the documents that match the filters.

Returns:

  • List[Document]

    A list of Documents that match the given filters.

Source code in src/couchbase_haystack/document_stores/document_store.py
def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
    """Returns the documents that match the filters provided.

    For a detailed specification of the filters,
    refer to the Haystack [documentation](https://docs.haystack.deepset.ai/v2.0/docs/metadata-filtering).

    Args:
        filters: The filters to apply. It returns only the documents that match the filters.

    Returns:
        A list of Documents that match the given filters.
    """
    search_filters: SearchQuery
    if filters:
        search_filters = _normalize_filters(filters)
    else:
        search_filters = search.MatchAllQuery()
    logger.debug(search_filters.encodable)
    request = search.SearchRequest(search_filters)
    options = SearchOptions(fields=["*"], limit=10000)

    if not self.is_global_level_index:
        response = self.scope.search(self.vector_search_index, request, options)
    else:
        response = self.connection.search(self.vector_search_index, request, options)

    return self.__get_doc_from_kv(response)

write_documents ¤

write_documents(
    documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE
) -> int

Writes documents into the couchbase collection.

Parameters:

  • documents (List[Document]) –

    A list of Documents to write to the document store.

  • policy (DuplicatePolicy, default: NONE ) –

    The duplicate policy to use when writing documents.

Raises:

  • DuplicateDocumentError

    If a document with the same ID already exists in the document store and the policy is set to DuplicatePolicy.FAIL (or not specified).

  • ValueError

    If the documents are not of type Document.

Returns:

  • int

    The number of documents written to the document store.

Source code in src/couchbase_haystack/document_stores/document_store.py
def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
    """Writes documents into the couchbase collection.

    Args:
        documents: A list of Documents to write to the document store.
        policy: The duplicate policy to use when writing documents.

    Raises:
        DuplicateDocumentError: If a document with the same ID already exists in the document store
            and the policy is set to DuplicatePolicy.FAIL (or not specified).
        ValueError: If the documents are not of type Document.

    Returns:
        The number of documents written to the document store.
    """
    if len(documents) > 0:
        if not isinstance(documents[0], Document):
            msg = "param 'documents' must contain a list of objects of type Document"
            raise ValueError(msg)

    if policy == DuplicatePolicy.NONE:
        policy = DuplicatePolicy.FAIL

    cb_documents = []
    for doc in documents:
        doc_dict = doc.to_dict(flatten=False)
        doc_dict = {k: v for k, v in doc_dict.items() if v is not None}
        if "sparse_embedding" in doc_dict:
            sparse_embedding = doc_dict.pop("sparse_embedding", None)
            if sparse_embedding:
                logger.warning(
                    "Document %s has the `sparse_embedding` field set,"
                    "but storing sparse embeddings in Couchbase is not currently supported."
                    "The `sparse_embedding` field will be ignored.",
                    doc.id,
                )
        cb_documents.append(doc_dict)
    written_docs = len(documents)

    operations = {doc["id"]: doc for doc in cb_documents}
    try:
        result: MultiMutationResult
        if policy == DuplicatePolicy.FAIL:
            result = self.collection.insert_multi(operations)
        else:
            result = self.collection.upsert_multi(operations)
    except Exception as e:
        logger.error("write error {e}")
        msg = f"Failed to write documents to Couchbase. Error: {e}"
        raise DocumentStoreError(msg) from e
    if not result.all_ok and result.exceptions:
        duplicate_ids = []
        other_errors = []
        for id, ex in result.exceptions.items():
            if isinstance(ex, DocumentExistsException):
                duplicate_ids.append(id)
            else:
                other_errors.append({"id": id, "exception": ex})
        if len(duplicate_ids) > 0:
            msg = f"IDs '{', '.join(duplicate_ids)}' already exist in the document store."
            raise DuplicateDocumentError(msg)
        if len(other_errors) > 0:
            msg = f"Failed to write documents to couchbase. Errors:\n{other_errors}"
            raise DocumentStoreError(msg)
    logger.debug("date written")
    return written_docs

delete_documents ¤

delete_documents(document_ids: List[str]) -> None

Deletes all documents with a matching document_ids from the document store.

Parameters:

  • document_ids (List[str]) –

    the document ids to delete

Source code in src/couchbase_haystack/document_stores/document_store.py
def delete_documents(self, document_ids: List[str]) -> None:
    """Deletes all documents with a matching document_ids from the document store.

    Args:
        document_ids: the document ids to delete
    """
    if not document_ids:
        return
    self.collection.remove_multi(keys=document_ids)

_embedding_retrieval ¤

_embedding_retrieval(
    query_embedding: List[float],
    top_k: int = 10,
    search_query: SearchQuery = None,
    limit: Optional[int] = None,
) -> List[Document]

Find the documents that are most similar to the provided query_embedding by using a vector similarity metric.

Parameters:

  • query_embedding (List[float]) –

    Embedding of the query

  • top_k (int, default: 10 ) –

    How many documents to be returned by the vector query

  • search_query (SearchQuery, default: None ) –

    Search filters param which is parsed to the Couchbase search query. The vector query and search query are ORed operation.

  • limit (Optional[int], default: None ) –

    Maximum number of Documents to return. Defaults to top_k if not specified.

Returns:

  • List[Document]

    A list of Documents that are most similar to the given query_embedding

Raises:

  • ValueError

    If query_embedding is empty

  • DocumentStoreError

    If the retrieval of documents from Couchbase fails

Source code in src/couchbase_haystack/document_stores/document_store.py
def _embedding_retrieval(
    self,
    query_embedding: List[float],
    top_k: int = 10,
    search_query: SearchQuery = None,
    limit: Optional[int] = None,
) -> List[Document]:
    """Find the documents that are most similar to the provided `query_embedding` by using a vector similarity metric.

    Args:
        query_embedding: Embedding of the query
        top_k: How many documents to be returned by the vector query
        search_query: Search filters param which is parsed to the Couchbase search query. The vector query and
            search query are ORed operation.
        limit: Maximum number of Documents to return. Defaults to top_k if not specified.

    Returns:
        A list of Documents that are most similar to the given `query_embedding`

    Raises:
        ValueError: If `query_embedding` is empty
        DocumentStoreError: If the retrieval of documents from Couchbase fails
    """
    if not query_embedding:
        msg = "Query embedding must not be empty"
        raise ValueError(msg)

    vector_search = VectorSearch.from_vector_query(
        VectorQuery(field_name="embedding", vector=query_embedding, num_candidates=top_k)
    )
    request = search.SearchRequest.create(vector_search)
    if search_query:
        request.with_search_query(search_query)

    if limit is None:
        limit = top_k
    options = SearchOptions(fields=["*"], limit=limit)

    if not self.is_global_level_index:
        response = self.scope.search(self.vector_search_index, request, options)
    else:
        response = self.connection.search(self.vector_search_index, request, options)

    return self.__get_doc_from_kv(response)

__get_doc_from_kv ¤

__get_doc_from_kv(response: SearchResult) -> List[Document]

Gets documents from Couchbase KV store based on search results.

Parameters:

Returns:

  • List[Document]

    List of Document objects

Raises:

  • DocumentStoreError

    If retrieving documents from Couchbase fails

Source code in src/couchbase_haystack/document_stores/document_store.py
def __get_doc_from_kv(self, response: SearchResult) -> List[Document]:
    """Gets documents from Couchbase KV store based on search results.

    Args:
        response: Search results from Couchbase

    Returns:
        List of Document objects

    Raises:
        DocumentStoreError: If retrieving documents from Couchbase fails
    """
    documents: List[Document] = []
    ids: List[str] = []
    scores: List[float] = []
    for doc in response.rows():
        ids.append(doc.id)
        scores.append(doc.score)
    kv_response = self.collection.get_multi(keys=ids)
    if not kv_response.all_ok and kv_response.exceptions:
        errors = []
        for id, ex in kv_response.exceptions.items():
            errors.append({"id": id, "exception": ex})
        if len(errors) > 0:
            msg = f"Failed to write documents to couchbase. Errors:\n{errors}"
            raise DocumentStoreError(msg)
    for i, id in enumerate(ids):
        get_result = kv_response.results.get(id)
        if get_result is not None and get_result.success:
            value = get_result.value
            value["id"] = id
            value["score"] = scores[i]
        documents.append(Document.from_dict(value))
    return documents