You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Confirm this is a feature request for the Python library and not the underlying OpenAI API.
This is a feature request for the Python library
Describe the feature or improvement you're requesting
Hi,
I have developed a monkey patch to add the capacity for chaining streams which is very beneficial for the Assistant API function execution workflow. I think it could be integrated into the openai library. So, I guess you want to know the use case, right?
Imagine you are processing the assistant events in a loop (in my case I use the Async stream client but it's basically the almost same for the non-async streaming one):
async for chunk in assistant_stream_response:
# Process chunk here
# Process function calls
if isinstance(chunk, ThreadRunRequiresAction):
tool_outputs = # Execute the function and gather the outputs in this var
new_stream = await async_client.beta.threads.runs.submit_tool_outputs(
thread_id=thread_id, # stored along the way
run_id=chunk.data.id,
tool_outputs=tool_outputs,
stream=True
)
# we can chain the new_stream at the end of the current one to avoid writing another chunk processing loop
assistant_stream_response.chain_stream(new_stream)
yield result
With this, we can chain the tool submit stream response to the current one to avoid writing another chunk processing loop.
Tested & working.
It very beneficial, especially when you integrate the assistant API inside a project to avoid changing the existing workflow.
Here is the monkey patch:
#--------------------------------------MONKEY-PATCH-OPENAI--------------------------------------------------------------
import openai
from typing import Any, TypeVar, AsyncIterator, cast
from openai._utils import is_mapping
from openai._exceptions import APIError
from openai import AsyncOpenAI
import httpx
_T = TypeVar("_T")
def monkey_patch__init__(self, *, cast_to: type[_T], response: httpx.Response, client: AsyncOpenAI) -> None:
self.response = response
self._cast_to = cast_to
self._client = client
self._decoder = client._make_sse_decoder()
self._iterator = self.__stream__()
self._chained_stream = None # MOD HERE
def chain_stream(self, stream): # NEW FUNCT HERE
if self._chained_stream:
self._chained_stream.chain_stream(stream)
else:
self._chained_stream = stream
async def monkey_patch__stream__(self) -> AsyncIterator[_T]:
cast_to = cast(Any, self._cast_to)
response = self.response
process_data = self._client._process_response_data
iterator = self._iter_events()
async for sse in iterator:
if sse.data.startswith("[DONE]"):
break
if sse.event is None:
data = sse.json()
if is_mapping(data) and data.get("error"):
message = None
error = data.get("error")
if is_mapping(error):
message = error.get("message")
if not message or not isinstance(message, str):
message = "An error occurred during streaming"
raise APIError(
message=message,
request=self.response.request,
body=data["error"],
)
yield process_data(data=data, cast_to=cast_to, response=response)
else:
data = sse.json()
if sse.event == "error" and is_mapping(data) and data.get("error"):
message = None
error = data.get("error")
if is_mapping(error):
message = error.get("message")
if not message or not isinstance(message, str):
message = "An error occurred during streaming"
raise APIError(
message=message,
request=self.response.request,
body=data["error"],
)
yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)
async for _sse in iterator:
...
if self._chained_stream: # MOD HERE
async for chunk in self._chained_stream:
yield chunk
openai.AsyncStream.__init__ = monkey_patch__init__
openai.AsyncStream.__stream__ = monkey_patch__stream__
openai.AsyncStream.chain_stream = chain_stream
#-----------------------------------------------------------------------------------------------------------------------
Best regards,
Paul Irolla
Additional context
I have implemented this inside my personal fork of LiteLLM for integrating the assistant API into the existing workflow without changing a thousand of code lines.
The text was updated successfully, but these errors were encountered:
pi-infected
changed the title
Assistant API : Chaining streams for function execution
New Feature Proposal: Assistant API - Chaining streams for function execution
Mar 22, 2024
Confirm this is a feature request for the Python library and not the underlying OpenAI API.
Describe the feature or improvement you're requesting
Hi,
I have developed a monkey patch to add the capacity for chaining streams which is very beneficial for the Assistant API function execution workflow. I think it could be integrated into the openai library. So, I guess you want to know the use case, right?
Imagine you are processing the assistant events in a loop (in my case I use the Async stream client but it's basically the almost same for the non-async streaming one):
With this, we can chain the tool submit stream response to the current one to avoid writing another chunk processing loop.
Tested & working.
It very beneficial, especially when you integrate the assistant API inside a project to avoid changing the existing workflow.
Here is the monkey patch:
Best regards,
Paul Irolla
Additional context
I have implemented this inside my personal fork of LiteLLM for integrating the assistant API into the existing workflow without changing a thousand of code lines.
The text was updated successfully, but these errors were encountered: