项目04-手搓Agent之DAG任务编排(按顺序执行任务)
介绍基于 DAG有向无环图的团队任务编排主要实现了以下三个内容多 AgentCoder / Researcher/ Testor协作执行任务如何具体地完成任务生产者 - 消费者模型任务队列 多线程异步执行DAG 任务依赖编排保证任务按先后顺序执行比如必须先分析需求→再设计架构→最后编码1 示例顶层接口defdemo_dag_orchestration():演示 3: DAG 任务编排 功能创建带依赖的任务查看执行顺序和可执行任务 print(\n 演示 3: DAG 任务编排 )# 初始化OpenAI客户端需提前定义API_KEY和BASE_URLclientOpenAI(api_keyAPI_KEY,base_urlBASE_URL)# 创建AI智能体团队teamAgentTeam(client,MODEL)# 创建DAG任务编排器绑定智能体团队orchestratorDAGTaskOrchestrator(team)# 创建3个有依赖关系的任务# 任务1分析需求无依赖t1orchestrator.add_task_with_deps({content:分析需求})# 任务2设计架构依赖任务1t2orchestrator.add_task_with_deps({content:设计架构},[t1])# 任务3编写代码依赖任务2t3orchestrator.add_task_with_deps({content:编写代码},[t2])# 打印演示信息print(f✅ 创建了 3 个任务)print(f 执行顺序:{orchestrator.get_execution_order()})print(f 可执行任务:{orchestrator.get_ready_tasks()})首先先创建Agent团队基于团队构建任务编排器DAG创建任务依赖关系2 创建 Agent 团队虽然前一章节通过TeamCollaboration创建了 Agent 的邮箱但是邮箱是用来通信的。具体 Agent 的工作信息没有存储因此需要AgentTeam类。classAgentTeam: Agent 团队 - 生产者消费者 DAG 核心管理多个AI智能体提供任务队列启动工作线程 def__init__(self,api_client,model:strgpt-5-nano):self.api_clientapi_client# OpenAI客户端self.modelmodel# 使用的大模型名称# 初始化智能体集群1个主管 3个子智能体编码/研究/测试self.leadLeadAgent(lead,model,api_client)self.sub_agents{coder:SubAgent(coder,coding,model,api_client),researcher:SubAgent(researcher,research,model,api_client),tester:SubAgent(tester,testing,model,api_client),}# 生产者-消费者模型核心任务队列存储待执行任务self.task_queuequeue.Queue()# DAG调度器管理任务依赖关系self.dag_schedulerDAGScheduler()# 存储运行中的智能体线程self.running_agents{}# 团队协作协议版本、角色、通信方式self.team_protocol{version:1.0,roles:list(self.sub_agents.keys()),communication:mailbox}defsubmit_task(self,task:Dict)-str:提交任务到团队队列生产者 :param task: 任务字典 :return: 任务ID # 生成唯一任务ID时间戳保证不重复task_idtask.get(id,ftask_{datetime.now().timestamp()})# 将任务放入队列self.task_queue.put({id:task_id,**task})returntask_iddefstart_team(self,num_workers:int3):启动智能体团队创建工作线程消费者 :param num_workers: 工作线程数量 foriinrange(num_workers):# 轮询分配子智能体coder→researcher→tester循环agent_namelist(self.sub_agents.keys())[i%len(self.sub_agents)]agentself.sub_agents[agent_name]# 创建守护线程主线程退出子线程自动退出tthreading.Thread(targetself._worker_loop,args(agent,),daemonTrue)t.start()# 记录运行中的线程self.running_agents[agent_name]tdef_worker_loop(self,agent:SubAgent):智能体工作循环消费者核心逻辑 持续从队列取任务 → 执行任务 → 标记完成 whileTrue:try:# 从队列取任务超时2秒避免无限阻塞taskself.task_queue.get(timeout2)# 子智能体执行任务resultagent.process_task(task)# 标记任务完成队列任务计数-1self.task_queue.task_done()exceptqueue.Empty:# 队列为空继续循环等待新任务continueexceptExceptionase:# 捕获异常避免线程崩溃print(f❌ [{agent.name}] Error:{e})defget_team_status(self)-Dict:获取团队运行状态return{protocol:self.team_protocol,queue_size:self.task_queue.qsize(),# 待执行任务数量agents:list(self.sub_agents.keys())# 智能体列表}第一步创建 lead 和 sub_agent.实例化这几个 Agent第二步定义任务队列.存储待执行的任务第三步DAG 调度器实例化.管理任务的依赖关系第四步实例化团队协议.用json形式进行存储协议为mailbox2.1 提交任务-生产者defsubmit_task(self,task:Dict)-str:提交任务到团队队列生产者 :param task: 任务字典 :return: 任务ID # 生成唯一任务ID时间戳保证不重复task_idtask.get(id,ftask_{datetime.now().timestamp()})# 将任务放入队列self.task_queue.put({id:task_id,**task})returntask_id2.2 Agent 工作逻辑-消费者消费逻辑为1通过轮询的方式给每一个 Agent 赋予agent_name2通过线程的方式启动sub_agent本质就是守护线程3具体处理任务的逻辑从队列取任务进行执行得到 result 后任务状态标记为完成defstart_team(self,num_workers:int3):启动智能体团队创建工作线程消费者 :param num_workers: 工作线程数量 foriinrange(num_workers):# 轮询分配子智能体coder→researcher→tester循环agent_namelist(self.sub_agents.keys())[i%len(self.sub_agents)]agentself.sub_agents[agent_name]# 创建守护线程主线程退出子线程自动退出tthreading.Thread(targetself._worker_loop,args(agent,),daemonTrue)t.start()# 记录运行中的线程self.running_agents[agent_name]tdef_worker_loop(self,agent:SubAgent):智能体工作循环消费者核心逻辑 持续从队列取任务 → 执行任务 → 标记完成 whileTrue:try:# 从队列取任务超时2秒避免无限阻塞taskself.task_queue.get(timeout2)# 子智能体执行任务resultagent.process_task(task)# 标记任务完成队列任务计数-1self.task_queue.task_done()exceptqueue.Empty:# 队列为空继续循环等待新任务continueexceptExceptionase:# 捕获异常避免线程崩溃print(f❌ [{agent.name}] Error:{e})3 DAG 构建classDAGTaskOrchestrator: DAG 任务编排器 核心基于DAG管理任务依赖保证任务按顺序执行 例如必须先分析需求→再设计架构→最后编写代码 def__init__(self,team:AgentTeam):self.teamteam# 绑定AI智能体团队self.dagDAGScheduler()# DAG调度器实例self.task_results{}# 存储任务执行结果defadd_task_with_deps(self,task:Dict,dependencies:List[str]None):添加带依赖关系的任务 :param task: 任务内容 :param dependencies: 依赖的任务ID列表 :return: 任务ID # 生成唯一任务IDtask_idtask.get(id,ftask_{datetime.now().timestamp()})# 将任务和依赖加入DAG调度器self.dag.add_task(task_id,dependencies)# 提交任务到团队队列等待执行self.team.submit_task({id:task_id,**task})returntask_iddefget_ready_tasks(self)-List[str]:获取【所有依赖已完成】的可执行任务returnself.dag.get_ready_tasks()defmark_task_done(self,task_id:str,result:strNone):标记任务完成并存储执行结果 :param task_id: 任务ID :param result: 任务执行结果 self.dag.mark_complete(task_id)ifresult:self.task_results[task_id]resultdefget_execution_order(self)-List[str]:获取DAG拓扑排序结果任务执行顺序try:returnself.dag.topological_sort()exceptValueErrorase:print(f❌ DAG Error:{e})return[]4 总结上一部分的团队协作TeamCollaboration 发消息 分配任务 记录日志它管的是谁给谁发了什么、任务分给谁、日志存哪里→ 只管任务的 “交付”不管执行顺序DAG 任务编排DAGTaskOrchestrator 控制执行顺序 管理依赖它管的是任务必须先做 A、再做 B、最后做 C→ 只管任务的 “先后规则”不管谁来执行例子合并逻辑主管发消息TeamCollaboration ↓ 创建带依赖的协作任务DAG TodoManager ↓ DAG 判断哪个任务能执行 ↓ 把可执行任务分配给智能体AgentTeam ↓ 智能体完成后 → 标记DAG完成 → 解锁下一个任务