agere是一个开源轻量级AI agent框架,特点是具有完全的通用性和可定制性,适用于构建和驱动各种任务流程。
在本教程中,我们将使用agere来构建一个基于OpenAI GPT模型的对话agent,让它可以进行多轮次对话和调用工具(完整的代码请参考最后的完整示例)。
通过本教程,你将了解用agere构建agent的基本流程和了解agere的关键部分。
使用agere来构建agent时,我们只需将任务流程拆分为一个个单元,考虑当前任务应该做什么即可。
首先第一步当然是要向`GPT`发送消息,并获得回复。
那么让我们来定义第一个Job,像下面这样:
如你所见,Job是一个类,你可以将执行一个任务所需的资源放在该对象中。你需要继承Job类来定义自己的Job,每一个Job必须定义一个`task`方法,你可以在这里完成要做的事情的具体逻辑。在Job中,如果一个工作比较复杂,你可以在处理完自己的部分后,将处理的结果或剩下的部分提交为一个新的Job,交给别人去干。要提交一个Job,使用`put_job`方法。
`task`方法必须使用`tasker`装饰器进行装饰,此装饰器需要一个密码,这个密码是为了提醒你不要在task内放阻塞线程的耗时任务,你应该去看一看密码的内容是什么,以了解使用task时最重要的注意事项,但在这里我可以告诉你,它是:"I assure all time-consuming tasks are delegated externally."。你当然不需要每次真的勤勤恳恳地输入该密码,你可以像上面这样导入该密码并使用它,这样即使有一天该密码的内容有所变化,你也不会受到影响。
在这里,我们定义的第一个Job叫`ChatJob`,它的任务是给GPT发送消息,在获得GPT的回复后,将回复的内容打包在一个`ResponseJob`中,转交给一个新的Job处理。
接着,我们要定义`ResponseJob`。在上一步中,我们只是获取到了一个回复对象,真正的消息接收还没有开始。
由于流式接收信息的过程是一个耗时的网络过程,我们将接收和处理消息任务作为一个handler来执行。在这个`ResponseJob`中,我们创建真正处理消息的handler,并直接将这个handler返回,它将自动被作为handler执行。
task可以返回一个handler,该handler将被执行。
我们之前在`ChatJob`中提交了一个`ResponseJob`,在`ResponseJob`中又提交了一个handler,这样看起来似乎有点繁琐,但这样做是为了将任务更细的拆分,后面我们还会有其它情况会用到`ResponseJob`,所以这里进行进行更细的拆分是有益处的。
在前面我们把回复的消息对象交给一个handler来处理,在这里我要来实现这个handler。Job是一个类,而handler是一个函数。要定义一个handler,你只需将一个函数或方法使用`handler`装饰器进行装饰即可。`handler`装饰器和`tasker`一样,也需要相同的密码。handler函数必须是协程的,被`handler`装饰器装饰的函数将返回一个`HandlerCoroutine`对象,它是一种`TaskNode`节点。`HandlerCoroutine`对象可以像普通协程对象那样被`await`。handler函数可以是一个普通函数也可以是一个类中的方法,与一般函数不同的是,它会自动传递一个参数`self_handler`,它与类的实例方法中传递的`self`参数类似,它的名字是任意的,会被自动绑定到自己所生成的`HandlerCoroutine`对象上,在调用这个函数时,无需手动传递`self_handler`参数。当handler函数是普通函数时,第一个参数被保留为`self_handler`,当handler函数为类中的方法时,第二个参数被保留为`self_handler`,紧挨着`self`。这使得我们可以在handler函数内部使用`self_handler`来访问自身,就像使用`self`那样。
GPT模型回复的消息可能是发送给用户的普通消息,也可能是进行工具调用的消息。GPT目前只能发送这两种消息中的一种,但我们希望让GPT可以同时发送这两种消息,也就是在进行工具调用的时候同时给用户发送消息。我们可以在函数的调用参数中添加一个`to_user`参数来实现这样的功能,参考[最后的完整示例]。agere的工具集中提供的`async_dispatcher_tools_call_for_openai`函数可以自动地从回复的消息中流式解析出发送给用户的部分和进行工具调用的部分。
在这个handler中我们使用agere提供的工具解析发送给用户的信息和调用工具的信息,并将解析出的结果发送给相应的处理函数来处理。
要调用一个handler,使用`call_handler`方法。
这里我们还使用了`LLMAsyncAdapter`,它的作用是将来自LLM的流式消息(同步可迭代对象)转化为异步可迭代对象,并提供在接收消息的不同阶段回调callback的能力。
这里,我们的流程有了分叉,对于发送给用户的消息,我们把它交给`user_handler`来处理,对于调用工具的消息,我们把它交给`function_call_handler`来处理。
`user_handler`和`function_call_handler`都是在处理GPT的回复消息,我们可以把它们放在一个类下。
发送给用户的消息的处理比较简单,只需将其打印并存储即可。对于工具调用的部分,我们需要调用函数,然后将函数的结果返回给GPT,并再次提交`ResponseJob`来处理回复信息,这里构成了一个环形的任务流程。
到现在为止,我们的agent几乎已经构建完毕,还剩下最后一个问题,就是怎么开始下一轮的对话呢?我们可以将上述的工作流程作为一个完整的任务,每执行一次就是完成了一个对话,然后手动地在最外层来循环这个任务。但如果我们想在agent内部实现这样的循环呢?我们的想法是,应该在一轮对话结束后开始新一轮的对话。但问题是,在处理GPT的回复信息时,分出了两条处理线,一条处理发送给用户的信息,一条处理工具调用的信息,显然任何一条线结束时都无法保证另一条线在此时也结束了,而只有两条线的处理都结束了时,一轮对话才算完整地完成。
这时候就要利用到`TaskNode`任务树的状态了,当一个任务的所有子任务都完成时,该任务将完成,因此当两条线都完成时,他们的父节点`ChatJob`将完成,所以我们可以在`ChatJob`完成时发起下一轮的对话。方法是为`ChatJob`添加一个在完成时执行的回调,在该回调函数中,我们发起新一轮的`ChatJob`任务,像下面这样:
在设置callback的时候,`which="at_job_end"`设置这个回调将在此Job被完成时执行,`"inject_task_node": True`设置回调时自动传递这个回调函数所属的`TaskNode`,也就是这个`ChatJob`自身。
在提交新一轮的`ChatJob`的时候,它应当作为一个全新的任务,而不是当前任务的子任务,所以我们使用`parent=task_node.commander`来指定这个任务的父节点,否则的话它会成为当前任务的子节点,我们并不想这样。
好了,现在我们已经完成了构建这个对话agent的每一步,让我们来创建一个commander,并开始执行它吧。
像下面这样创建一个commander并把第一个Job交给它:
使用`run`来开始一个任务时,你也可以指定`auto_exit=True`来让其在任务运行完毕后自动退出commander。`run`将在commander运行时被阻塞,直到commander退出才返回。它可以具有返回值,可以在commander退出时通过`return_result`指定`run`的返回值。
在这个示例中,我们定义了两个Job(`ChatJob`和`ResponseJob`)和三个handler(`response_handler`,`user_handler`和`function_call_handler`),并在`ChatJob`中使用了一个`at_job_end`的callback,用一个commander来执行这些任务。
这个agent运行的流程如下图所示:
完整的示例代码为:
也许你会感觉使用agere并没有使你的代码量变得少很多,确实,它不会大幅减少构建一个agent的代码量,它不会为你预先做好很多事,等待你直接去使用。正像它的名字所暗示的那样,它的作用是“去做,驱动”,作为一个驱动器,agere的目标是简化构建agent时的逻辑流程,使你只需专注于定义当前的操作。agere也有助于你的agent保持更好的逻辑清晰性,在需要拓展和修改时,也更加容易和方便。为了完全的通用性和可定制性,它不会帮你去写具体的逻辑,未来我也不打算这么做。这也是使它保存轻量的一个方面。agere无第三方依赖。