Skip to content

Commit

Permalink
chromadb sync local storage on rw operations (#9140)
Browse files Browse the repository at this point in the history
  • Loading branch information
ea-rus committed Apr 29, 2024
1 parent 07462b3 commit 79c6197
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions mindsdb/integrations/handlers/chromadb_handler/chromadb_handler.py
Expand Up @@ -103,13 +103,16 @@ def _get_client(self):
port=client_config["chroma_server_http_port"],
)

def _sync(self):
# if handler storage is used: sync on every change write operation
if self.persist_directory:
self.handler_storage.folder_sync(self.persist_directory)

def __del__(self):
"""Close the database connection."""

if self.is_connected is True:
if self.persist_directory:
# sync folder to handler storage
self.handler_storage.folder_sync(self.persist_directory)
self._sync()

self.disconnect()

Expand Down Expand Up @@ -341,6 +344,7 @@ def insert(self, table_name: str, data: pd.DataFrame):
embeddings=data[TableField.EMBEDDINGS.value],
metadatas=data.get(TableField.METADATA.value),
)
self._sync()

def upsert(self, table_name: str, data: pd.DataFrame):
return self.insert(table_name, data)
Expand Down Expand Up @@ -368,6 +372,7 @@ def update(
embeddings=data[TableField.EMBEDDINGS.value],
metadatas=data.get(TableField.METADATA.value),
)
self._sync()

def delete(
self, table_name: str, conditions: List[FilterCondition] = None
Expand All @@ -384,19 +389,22 @@ def delete(
raise Exception("Delete query must have at least one condition!")
collection = self._client.get_collection(table_name)
collection.delete(ids=id_filters, where=filters)
self._sync()

def create_table(self, table_name: str, if_not_exists=True):
"""
Create a collection with the given name in the ChromaDB database.
"""
self._client.create_collection(table_name, get_or_create=if_not_exists)
self._sync()

def drop_table(self, table_name: str, if_exists=True):
"""
Delete a collection from the ChromaDB database.
"""
try:
self._client.delete_collection(table_name)
self._sync()
except ValueError:
if if_exists:
return
Expand Down

0 comments on commit 79c6197

Please sign in to comment.