Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support Indexing options for Astra DB columns #2919

Closed
Closed
89 changes: 77 additions & 12 deletions unstructured/ingest/connector/astra.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import copy
import json
import typing as t
from dataclasses import dataclass, field
from warnings import warn

from unstructured import __name__ as integration_name
from unstructured.__version__ import __version__ as integration_version
Expand All @@ -15,14 +17,11 @@
)
from unstructured.ingest.logger import logger
from unstructured.ingest.utils.data_prep import chunk_generator
from unstructured.staging.base import flatten_dict
from unstructured.utils import requires_dependencies

if t.TYPE_CHECKING:
from astrapy.db import AstraDB, AstraDBCollection

NON_INDEXED_FIELDS = ["metadata._node_content", "content"]


@dataclass
class AstraAccessConfig(AccessConfig):
Expand All @@ -35,6 +34,8 @@ class SimpleAstraConfig(BaseConnectorConfig):
access_config: AstraAccessConfig
collection_name: str
embedding_dimension: int
namespace: t.Optional[str] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want them exposed to the cli, namespace and requested_indexing_policy need to be added to

https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/ingest/cli/cmds/astra.py

Which will also be helpful since they will have some documentation there.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! i tried to match the data model to the "required" option for the CLI, hopefully this looks good.

requested_indexing_policy: t.Optional[t.Dict[str, t.Any]] = None


@dataclass
Expand Down Expand Up @@ -67,23 +68,89 @@ def to_dict(self, **kwargs):
@requires_dependencies(["astrapy"], extras="astra")
def astra_db_collection(self) -> "AstraDBCollection":
if self._astra_db_collection is None:
from astrapy.api import APIRequestError
from astrapy.db import AstraDB

# Get the collection_name and embedding dimension
collection_name = self.connector_config.collection_name
embedding_dimension = self.connector_config.embedding_dimension
requested_indexing_policy = self.connector_config.requested_indexing_policy

if requested_indexing_policy is not None:
_options = {"indexing": requested_indexing_policy}
else:
_options = None

# Build the Astra DB object.
# caller_name/version for AstraDB tracking
self._astra_db = AstraDB(
api_endpoint=self.connector_config.access_config.api_endpoint,
token=self.connector_config.access_config.token,
namespace=self.connector_config.namespace,
caller_name=integration_name,
caller_version=integration_version,
)

# Create and connect to the newly created collection
self._astra_db_collection = self._astra_db.create_collection(
collection_name=self.connector_config.collection_name,
dimension=self.connector_config.embedding_dimension,
options={"indexing": {"deny": NON_INDEXED_FIELDS}},
)
try:
# Create and connect to the newly created collection
self._astra_db_collection = self._astra_db.create_collection(
collection_name=collection_name,
dimension=embedding_dimension,
options=_options,
)
except APIRequestError:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems clunky to have all this happen under an except. Is there a better way to prequalify the collection?

Also, I assume .create_collection just connects if the collection is already created.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potter-potter you are right. to be honest, this logic was just taken from another integration with a different library, but i think a lot of it is legacy in terms of how the library behaves. In this instance, just creating the collection (which as you said, will connect if the collection already exists) will be enough.

If there is an APIRequestError due to legacy indexing settings, which is what the logic was intending to handle, there are obvious ways now for the user to address it, AND i dont expect this to be the case for Unstructured users in particular.

All that said, good call out and i'll just clean up this code by removing the try / except clause - connect or create, any error will be raised (which should almost never occur)

# possibly the collection is preexisting and has legacy
# indexing settings: verify
get_coll_response = self._astra_db.get_collections(options={"explain": True})
collections = (get_coll_response["status"] or {}).get("collections") or []
preexisting = [
collection
for collection in collections
if collection["name"] == collection_name
]
if preexisting:
pre_collection = preexisting[0]
# if it has no "indexing", it is a legacy collection;
# otherwise it's unexpected warn and proceed at user's risk
pre_col_options = pre_collection.get("options") or {}
if "indexing" not in pre_col_options:
warn(
(
f"Collection '{collection_name}' is detected as "
"having indexing turned on for all fields "
"(either created manually or by older versions "
"of this plugin). This implies stricter "
"limitations on the amount of text"
" each entry can store. Consider reindexing anew on a"
" fresh collection to be able to store longer texts."
),
UserWarning,
stacklevel=2,
)
self._astra_db_collection = self._astra_db.collection(
collection_name=collection_name,
)
else:
options_json = json.dumps(pre_col_options["indexing"])
warn(
(
f"Collection '{collection_name}' has unexpected 'indexing'"
f" settings (options.indexing = {options_json})."
" This can result in odd behaviour when running "
" metadata filtering and/or unwarranted limitations"
" on storing long texts. Consider reindexing anew on a"
" fresh collection."
),
UserWarning,
stacklevel=2,
)
self._astra_db_collection = self._astra_db.collection(
collection_name=collection_name,
)
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This else doesn't seem right. It will run, i believe, if the try completes successfully.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented above, but this will no longer be part of the code

# other exception
raise

return self._astra_db_collection

@requires_dependencies(["astrapy"], extras="astra")
Expand Down Expand Up @@ -111,7 +178,5 @@ def normalize_dict(self, element_dict: dict) -> dict:
return {
"$vector": element_dict.pop("embeddings", None),
"content": element_dict.pop("text", None),
"metadata": flatten_dict(
element_dict, separator="-", flatten_lists=True, remove_none=True
),
"metadata": element_dict,
}