-
Notifications
You must be signed in to change notification settings - Fork 536
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support Indexing options for Astra DB columns #2919
Changes from 9 commits
c814b55
35a84ea
527ea09
d194b08
7ed3c3a
ee4a5ed
3df6417
4976fb0
275674b
9c87fec
acc2edb
66bee62
78350a2
d302e7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
import copy | ||
import json | ||
import typing as t | ||
from dataclasses import dataclass, field | ||
from warnings import warn | ||
|
||
from unstructured import __name__ as integration_name | ||
from unstructured.__version__ import __version__ as integration_version | ||
|
@@ -15,14 +17,11 @@ | |
) | ||
from unstructured.ingest.logger import logger | ||
from unstructured.ingest.utils.data_prep import chunk_generator | ||
from unstructured.staging.base import flatten_dict | ||
from unstructured.utils import requires_dependencies | ||
|
||
if t.TYPE_CHECKING: | ||
from astrapy.db import AstraDB, AstraDBCollection | ||
|
||
NON_INDEXED_FIELDS = ["metadata._node_content", "content"] | ||
|
||
|
||
@dataclass | ||
class AstraAccessConfig(AccessConfig): | ||
|
@@ -35,6 +34,8 @@ class SimpleAstraConfig(BaseConnectorConfig): | |
access_config: AstraAccessConfig | ||
collection_name: str | ||
embedding_dimension: int | ||
namespace: t.Optional[str] = None | ||
requested_indexing_policy: t.Optional[t.Dict[str, t.Any]] = None | ||
|
||
|
||
@dataclass | ||
|
@@ -67,23 +68,89 @@ def to_dict(self, **kwargs): | |
@requires_dependencies(["astrapy"], extras="astra") | ||
def astra_db_collection(self) -> "AstraDBCollection": | ||
if self._astra_db_collection is None: | ||
from astrapy.api import APIRequestError | ||
from astrapy.db import AstraDB | ||
|
||
# Get the collection_name and embedding dimension | ||
collection_name = self.connector_config.collection_name | ||
embedding_dimension = self.connector_config.embedding_dimension | ||
requested_indexing_policy = self.connector_config.requested_indexing_policy | ||
|
||
if requested_indexing_policy is not None: | ||
_options = {"indexing": requested_indexing_policy} | ||
else: | ||
_options = None | ||
|
||
# Build the Astra DB object. | ||
# caller_name/version for AstraDB tracking | ||
self._astra_db = AstraDB( | ||
api_endpoint=self.connector_config.access_config.api_endpoint, | ||
token=self.connector_config.access_config.token, | ||
namespace=self.connector_config.namespace, | ||
caller_name=integration_name, | ||
caller_version=integration_version, | ||
) | ||
|
||
# Create and connect to the newly created collection | ||
self._astra_db_collection = self._astra_db.create_collection( | ||
collection_name=self.connector_config.collection_name, | ||
dimension=self.connector_config.embedding_dimension, | ||
options={"indexing": {"deny": NON_INDEXED_FIELDS}}, | ||
) | ||
try: | ||
# Create and connect to the newly created collection | ||
self._astra_db_collection = self._astra_db.create_collection( | ||
collection_name=collection_name, | ||
dimension=embedding_dimension, | ||
options=_options, | ||
) | ||
except APIRequestError: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems clunky to have all this happen under an except. Is there a better way to prequalify the collection? Also, I assume .create_collection just connects if the collection is already created. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @potter-potter you are right. to be honest, this logic was just taken from another integration with a different library, but i think a lot of it is legacy in terms of how the library behaves. In this instance, just creating the collection (which as you said, will connect if the collection already exists) will be enough. If there is an APIRequestError due to legacy indexing settings, which is what the logic was intending to handle, there are obvious ways now for the user to address it, AND i dont expect this to be the case for Unstructured users in particular. All that said, good call out and i'll just clean up this code by removing the try / except clause - connect or create, any error will be raised (which should almost never occur) |
||
# possibly the collection is preexisting and has legacy | ||
# indexing settings: verify | ||
get_coll_response = self._astra_db.get_collections(options={"explain": True}) | ||
collections = (get_coll_response["status"] or {}).get("collections") or [] | ||
preexisting = [ | ||
collection | ||
for collection in collections | ||
if collection["name"] == collection_name | ||
] | ||
if preexisting: | ||
pre_collection = preexisting[0] | ||
# if it has no "indexing", it is a legacy collection; | ||
# otherwise it's unexpected warn and proceed at user's risk | ||
pre_col_options = pre_collection.get("options") or {} | ||
if "indexing" not in pre_col_options: | ||
warn( | ||
( | ||
f"Collection '{collection_name}' is detected as " | ||
"having indexing turned on for all fields " | ||
"(either created manually or by older versions " | ||
"of this plugin). This implies stricter " | ||
"limitations on the amount of text" | ||
" each entry can store. Consider reindexing anew on a" | ||
" fresh collection to be able to store longer texts." | ||
), | ||
UserWarning, | ||
stacklevel=2, | ||
) | ||
self._astra_db_collection = self._astra_db.collection( | ||
collection_name=collection_name, | ||
) | ||
else: | ||
options_json = json.dumps(pre_col_options["indexing"]) | ||
warn( | ||
( | ||
f"Collection '{collection_name}' has unexpected 'indexing'" | ||
f" settings (options.indexing = {options_json})." | ||
" This can result in odd behaviour when running " | ||
" metadata filtering and/or unwarranted limitations" | ||
" on storing long texts. Consider reindexing anew on a" | ||
" fresh collection." | ||
), | ||
UserWarning, | ||
stacklevel=2, | ||
) | ||
self._astra_db_collection = self._astra_db.collection( | ||
collection_name=collection_name, | ||
) | ||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This else doesn't seem right. It will run, i believe, if the try completes successfully. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Commented above, but this will no longer be part of the code |
||
# other exception | ||
raise | ||
|
||
return self._astra_db_collection | ||
|
||
@requires_dependencies(["astrapy"], extras="astra") | ||
|
@@ -111,7 +178,5 @@ def normalize_dict(self, element_dict: dict) -> dict: | |
return { | ||
"$vector": element_dict.pop("embeddings", None), | ||
"content": element_dict.pop("text", None), | ||
"metadata": flatten_dict( | ||
element_dict, separator="-", flatten_lists=True, remove_none=True | ||
), | ||
"metadata": element_dict, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you want them exposed to the cli, namespace and requested_indexing_policy need to be added to
https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/ingest/cli/cmds/astra.py
Which will also be helpful since they will have some documentation there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! i tried to match the data model to the "required" option for the CLI, hopefully this looks good.