Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Agent][Feat] Ensure coroutine safety #282

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from

Conversation

Bobholamovic
Copy link
Member

@Bobholamovic Bobholamovic commented Jan 10, 2024

erniebot-agent提供异步API,但部分关键组件尚未充分考虑并发安全性,在被并发使用时可能出现错误。本PR旨在对erniebot-agent进行并发控制。具体而言,本PR考虑agent与file manager组件在并发场景可能遇到的问题,通过加锁等手段减小竞态条件等问题造成的影响。

具体的讨论点见comment。

@@ -110,6 +108,7 @@ def __init__(
if plugins is not None:
raise NotImplementedError("The use of plugins is not supported yet.")
self._init_file_needs_url()
self._is_running = False
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

考虑agent被并发调用的情况:

def create_agent_run_task(prompt):
    return asyncio.create_task(agent.run(prompt))

create_agent_run_task(prompt1)
create_agent_run_task(prompt2)

在以上代码中,用户可能希望向agent派发任务,而这些任务将被并发执行。此处存在race condition:由于agent是有状态的(带有memory),两个任务都执行完成后、乃至执行过程中agent的状态将与两个任务的实际执行顺序与时机有关。

为了解决这个问题,我们不妨为Agent类引入一个属性_is_running,用这个属性来控制同一时刻agent只能执行一个任务。

@@ -251,10 +253,10 @@ async def _run_tool(self, tool: BaseTool, tool_args: str) -> ToolResponse:
# XXX: Sniffing is less efficient and probably unnecessary.
# Can we make a protocol to statically recognize file inputs and outputs
# or can we have the tools introspect about this?
input_files = file_manager.sniff_and_extract_files_from_list(list(parsed_tool_args.values()))
input_files = await file_manager.sniff_and_extract_files_from_obj(parsed_tool_args)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

此处存在bug:sniff_and_extract_files_from_list不能递归地找到parsed_tool_args中可能存在的所有file,而只能侦测出位于top-level的file。将sniff_and_extract_files_from_list修改为sniff_and_extract_files_from_obj以解决这个问题。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这块我在 #292 中已经解决了,合入之后你这块 update 一下就行了。

input_files 和 output_files 这块都可能需要调整一下。

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯嗯

), # TODO: make sure this is correct.
output_files=file_manager.sniff_and_extract_files_from_text(output_message.content),
output_files=[],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plugin不具备处理file ID的功能,所以不应该试图从output_message中提取file ID。

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对于file manager而言,并发安全性更加值得考虑。这是因为我们为每个event loop提供一个全局的FileManager对象作为默认使用的file manager,而全局对象往往更经常被并发使用。

在我们的设计中,对于一个FileManager对象来说,每个file被分配唯一的file ID,因此并发调用FileManager的创建file的方法(如create_file_from_path)通常是并发安全的,不需要加锁。然而,FileManager并不只有创建型的方法,还有prune这样的清理file的方法以及look_up_file_by_id这样的查询file的方法。这些方法均访问共享资源,有读操作也有写操作,并不是任意操作并发执行都能得到正确、有意义的结果。此外,retrieve_remote_file_by_id方法在ID已存在于本地时应该调用失败并给出错误提示,也就是说,该方法需要缓存功能。

综上,为简单起见,也为了方便未来扩展,本PR将FileManager中访问共享资源、可能互相冲突的公开方法全部锁住,保证这些方法只能串行执行。这显然是一个笨办法,降低了效率。但是,我们在供开发者阅读的文档中介绍FileManager的用法时,实际上也假设了FileManager的各个方法被串行地调用。因此,引入锁并没有改变我们对FileManager预期的正确行为。

@@ -111,12 +113,22 @@ def __init__(
# This can be done lazily, but we need to be careful about race conditions.
self._temp_dir = self._create_temp_dir()
self._save_dir = pathlib.Path(self._temp_dir.name)
if not prune_on_close:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

save_dirNone、而prune_on_closeTrue时,由于被创建为临时目录的self._save_dir会被删除,因此对用户来说pruning-on-close还是在某种程度上进行了(本地的临时文件被移除)。这里对这一行为显式给出警告。


async def list_remote_files(self) -> List[RemoteFile]:
self.ensure_not_closed()
files = await self._get_remote_file_client().list_files()
return files

def look_up_file_by_id(self, file_id: str) -> File:
async def look_up_file_by_id(self, file_id: str) -> File:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

由于锁的存在,该方法从sync变更为async。

Copy link
Contributor

@Southpika Southpika Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notes: 添加同步方法,同步方法中如果遇到look up正在被清理的文件,是不是可以返回None 给warning相关信息?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已添加同步方法~不过warning这一点技术上似乎不是很容易做到,考虑到方法名称中已经有unsafe,用户应当了解此方法的适用场景以及调用可能带来的后果(不安全),我觉得也可以考虑不加warning?

assert_never()
self._file_registry.unregister_file(file)

async def _sniff_and_extract_files_from_obj(self, obj: object, *, recursive: bool = True) -> List[File]:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python asyncio标准库暂不支持可重入锁,为了避免死锁,为可能被递归执行的方法添加无锁的内部实现。

def _get_default_file_type(self) -> Literal["local", "remote"]:
if self._remote_file_client is not None:
return "remote"
else:
return "local"

def _get_unique_file_path(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

使用更安全、高效的标准库方法tempfile.mkstemp代替手动实现的unique file创建方式。

@Bobholamovic Bobholamovic marked this pull request as ready for review January 10, 2024 13:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants