-lgne.png)
Agent
Agent 智能体
Agent 是什么?
大语言模型(比如 ChatGPT)虽然能理解输入、分析推理、输出文字或代码,但它更像一个“聪明的对话者”——没有真正的记忆,不会主动规划,也无法动手操作现实世界的东西(比如订机票、查天气)。
而 AI Agent(智能代理)就像给语言模型装上了“大脑 + 手脚”:
- 会规划:拆解复杂任务(比如“策划一次旅行” → 自动分成查天气、订酒店、排行程等步骤)
- 能动手:调用工具(搜索/计算/订票 API/控制智能家居)
- 有记忆:长期记住你的习惯(比如你讨厌红眼航班)
- 更主动:遇到问题会自己调整方案(比如发现酒店超预算,自动找平替)
[!TIP]
举个栗子 🌰:
你对 Agent 说:“周末想带娃去上海玩,预算 5000 元”
它会自动:查天气 → 推荐亲子景点 → 比价订酒店 → 生成带地图的攻略 → 发现超预算时主动调整方案
本质上就是把大模型升级成能自主干活的智能体。
Agents 流程图
- 规划能力
智能体能够将复杂任务拆解为可执行的子任务,并动态规划执行路径。通过实时评估任务进展,自主决策继续执行或终止任务。
- 记忆系统
采用双重记忆架构:短期记忆维护任务执行上下文,随子任务动态更新并在任务完成后清除;长期记忆通过向量数据库实现知识持久化存储与检索。
- 工具调用
配备多模态工具 API(包括计算引擎、搜索引擎、代码解释器、数据库接口等),使智能体具备物理世界交互能力,解决现实场景问题。
- 执行机制
基于规划决策和记忆系统驱动具体行动,通过工具调用或环境交互实现任务目标。
举例:
[!TIP]
场景描述
用户通过智能购物助手寻找并购买适合的礼物,系统需要理解需求、推荐商品、比价并完成购买。
- 感知(Perception)
- 用户输入:
“我想给 10 岁的小男孩买生日礼物,预算 500 元左右,他喜欢科幻和机器人。” - 需求解析:
- NLP 识别关键信息:年龄(10 岁)、性别(男)、兴趣(科幻/机器人)、预算(500 元)。
- 情感分析判断用户希望挑选有惊喜感的礼物。
- 规划(Planning)
系统制定决策流程: - 商品检索:
- 在电商平台筛选"儿童科技玩具"类目,优先显示机器人/科幻相关商品。
- 推荐策略:
- 结合用户预算,排除超价商品,按销量/评分排序。
- 比价方案:
- 对比不同平台的同款商品价格和物流时效。
- 交互设计:
- 准备图文并茂的推荐列表,并附赠贺卡选项。
- 行动(Action)
- 执行检索:
找到 3 款符合条件的商品:- 可编程机器人(¥488)
- 太空主题拼装模型(¥429)
- 智能对话外星人玩偶(¥369)
- 生成推荐:
发送结构化信息:
“根据需求为您推荐以下商品(均包邮):
①【XX 机器人】编程启蒙玩具,赠 6 节课程(¥488)
②【宇宙空间站】3D 立体拼图,培养动手能力(¥429)
③【AI 外星人】智能语音互动,支持自定义表情(¥369)
您想了解哪款的详情?或需要其他建议?”
- 观察(Observation)
- 用户反馈:
“机器人那个课程具体教什么?有没有优惠券?” - 需求迭代:
- 识别新需求:商品细节查询 + 价格优化
- 调整规划:
- 调取商品详情页的教育内容
- 自动申请平台新人券(满 400 减 50)
- 计算折后价:¥488→¥438
- 循环优化
- 二次推荐:
"该机器人包含 Scratch 编程入门课程,涵盖:- 基础逻辑指令(第 1-2 课)
- 传感器应用(第 3-4 课)
- 创意项目实战(第 5-6 课)
检测到可用新人券,折后 ¥438(原价 ¥488),要直接下单吗?"
- 终止条件:
用户确认购买后,自动填写默认地址并询问:
“需要添加生日贺卡(免费)?可定制 20 字以内祝福语。”
规划(Planing)
当人类面对任务时,会自然进入以下思维流程:
[!TIP]
- 任务解析:首先思考任务的本质和完成路径。
- 工具评估:审视现有资源,规划工具的高效使用方法。
- 任务拆解:将复杂任务结构化分解为可执行的子任务(类似思维导图的分层逻辑)。
- 过程反思:执行中持续优化策略,通过经验迭代改进后续步骤。
- 终止判断:动态评估任务完成度,决策何时结束执行。
为了让智能体具备类人的任务处理思维,我们通过 LLM 提示工程构建以下核心能力:
分解任务
- 使 LLM 能够将复杂任务拆解为可独立执行的子任务模块
- 通过分层递归分解实现任务粒度控制(如:旅行规划 → 交通预订 + 酒店选择 + 行程安排)
思维链(Chain of Thoughts, CoT)
- 基础推理框架:要求 LLM「逐步思考(think step by step)」
- 实现线性推理过程(如:数学解题时显式展示演算步骤)
- 典型应用:需顺序执行的确定性任务
思维树(Tree-of-thought, ToT)
-
进阶推理框架:在 CoT 每个节点扩展多分支可能性
-
关键技术:
- 启发式评估:对分支进行可行性打分
- 搜索算法:采用 BFS/DFS 进行空间探索
- 回溯机制:动态修剪低效路径
-
典型应用:创意生成/多方案决策场景
反思与改进
Agent 也在不断“回头看”。它明白,行动中犯错是常态,但关键在于如何处理这些错误。当遇到足够“刺头”的问题,或者连续碰壁时,它会停下来,像我们整理房间一样,把之前的思路和做法好好清理一遍。看看哪里没考虑周全,哪里用力过猛,哪里本可以换个方法。这就像我们常说的“亡羊补牢”,虽然损失已经造成,但找到漏洞并修补,下次就安全多了。Agent 通过这种周期性的“复盘”和“补牢”,不断优化自己的“工具箱”,让未来的行动更少波折,结果更接近理想。
ReAct 框架
[!TIP]
ReAct 就像一个戴着思考帽子的机器人。
当它接到一个任务时,它不会立刻动手。首先,它会思考(Reasoning/Thought):它低着头,小声对自己说,“嗯,这个问题有点复杂,我需要先了解…”,它会列出一些可能的步骤,或者问自己一些问题。
想清楚之后,它就采取行动(Acting/Action):比如,它可能会去图书馆(搜索信息),或者拿起工具(执行操作)。
做完之后,它会观察(Observation)结果:“哦,原来是这样,我找到了答案。” 或者 “啊,这个方法不行。”
然后,它会根据观察到的结果,再次思考,调整计划,再次行动…
就这样,ReAct 让 AI 像人一样,边想边做,不断调整,一步步解决问题,而不是一下子就给出一个可能不对的答案。它把思考的过程都展示出来,让人也看得明白它在怎么想。
- 初始化: 模型接收初始任务或问题。
- 思考: 模型生成一个思考步骤,规划如何开始解决任务。
- 行动: 模型根据思考结果执行一个行动(如查询信息、计算等)。
- 观察: 模型接收行动的结果。
- 迭代: 模型将观察到的结果纳入新的思考中,再次进行思考、行动、观察,直到任务完成或达到某个终止条件。
任务: “告诉我 2023 年世界杯足球赛的冠军是哪个国家?”
模型(ReAct 框架下):
- Thought: 我需要查找关于 2023 年世界杯足球赛冠军的信息。我应该使用搜索引擎来获取这个事实。
- Action: 搜索[‘2023 年世界杯足球赛 冠军’]
- Observation: 搜索结果显示,2022 年卡塔尔世界杯(通常被报道为 2023 年赛事)的冠军是阿根廷队。
- Thought: 搜索结果明确了,2022 年卡塔尔世界杯的冠军是阿根廷。虽然年份标注有时会有混淆,但这是公认的事实。我可以直接回答用户。
- Action: 告诉用户“2022 年卡塔尔世界杯足球赛的冠军是阿根廷队。”
(注:这里简化了“观察”和“行动”的格式,实际实现中可能有更结构化的表示,如 Thought: ... \n Action: Search[关键词] \n Observation: 搜索结果... \n Thought: ... \n Action: Finish[最终答案]
)
记忆(Memory)
日常中的记忆机制:
-
感觉记忆:感官信息的短暂残留,仅持续几秒。
-
短期记忆:临时存储和处理少量信息,如记电话号码。
-
长期记忆:持久存储大量信息,分两类:
- 显性记忆:可主动回忆的,如个人经历(情景记忆)和知识概念(语义记忆)。
- 隐性记忆:无意识的技能和习惯,如骑自行车。
智能体中的记忆机制:
- 形成记忆:通过预训练学习世界知识,内化为长期记忆基础。
- 短期记忆:任务执行中暂存信息,任务结束后清空。
- 长期记忆:依赖外部知识库(如向量数据库)存储和检索大量信息。
工具使用(Tools/Toolkits)
Agent 能像人类一样,通过学习使用外部工具来突破自身限制。对于模型权重固定、难以更新的 LLM 来说,调用外部 API 获取实时信息、执行代码或访问专属数据源,就如同我们借助工具延伸了手脚和大脑。这种能力让 LLM 不再局限于训练时的知识,显著拓宽了其解决问题的边界。
简单使用工具
_from langchain_openai import ChatOpenAI
from langchain.agents import create_react_agent, AgentExecutor
from langchain import hub
from langchain_tavily import TavilySearch
from dotenv import load_dotenv
import os
load_dotenv()
model = "qwen-turbo-latest"
api_key = os.getenv("DASHSCOPE_API_KEY")
api_base_url = os.getenv("DASHSCOPE_BASE_URL")
# 定义llm
llm = ChatOpenAI(
model=model,
api_key=api_key,
base_url=api_base_url
)
tools = [
TavilySearch(),
]
# 使用新的Agent创建方式
prompt = hub.pull("hwchase17/react")
agent = create_react_agent(
llm,
tools, # 是否打印日志
prompt
)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
__print(agent_executor.invoke({"input": "请问现任的美国总统是谁?他的年龄的平方是多少? 请用中文告诉我这两个问题的答案"}))_
使用已有的工具
参考:https://python.langchain.com/docs/integrations/tools/
给 Agent 添加记忆
pip install langchain-tavily
# 导入必要的库
from langchain.memory import ConversationBufferMemory
from langchain_openai import ChatOpenAI
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.prompts import PromptTemplate
from langchain_tavily import TavilySearch
from langchain_core.tools import tool
import numexpr
import math
from dotenv import load_dotenv
import os
# 加载环境变量
load_dotenv()
# 定义模型参数
model = "qwen-turbo-latest"
api_key = os.getenv("DASHSCOPE_API_KEY")
api_base_url = os.getenv("DASHSCOPE_BASE_URL")
# 初始化LLM
llm = ChatOpenAI(
model=model,
api_key=api_key,
base_url=api_base_url
)
# 构建一个搜索工具
search = TavilySearch()
# 创建一个数学计算工具
@tool
def calculator(expression: str) -> str:
_"""数学计算器"""_
_ _try:
local_dict = {"pi": math.pi, "e": math.e}
result = numexpr.evaluate(expression.strip(), global_dict={}, local_dict=local_dict)
return str(result)
except Exception as e:
return f"计算错误: {str(e)}"
# 定义工具列表(直接使用现成的工具)
tools = [
search,
calculator
]
# 记忆组件
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# 使用最新的Agent创建方式
custom_prompt = PromptTemplate.from_template("""
你是一个有用的AI助手,能够使用工具来回答问题。你有以下工具可用:
{tools}
使用以下格式:
Question: 需要回答的输入问题
Thought: 你应该总是思考该做什么
Action: 要采取的行动,应该是[{tool_names}]中的一个
Action Input: 行动的输入
Observation: 行动的结果
... (这个Thought/Action/Action Input/Observation可以重复N次)
Thought: 我现在知道最终答案了
Final Answer: 对原始输入问题的最终答案
重要提示:
1. 如果聊天历史中有相关信息,请参考并利用这些信息
2. 聊天历史记录了之前的对话内容,你应该能够回忆和引用
聊天历史:
{chat_history}
Question: {input}
Thought: {agent_scratchpad}
""")
# 创建Agent
agent = create_react_agent(
llm,
tools, # 是否打印日志
custom_prompt
)
# 创建Agent管理器
agent_executor = AgentExecutor(agent=agent,
tools=tools,
memory=memory,
verbose=True)
# 测试对话
print("=== 第一次对话 ===")
response1 = agent_executor.invoke({"input": "美国总统现在是谁?"})
print(f"回答: {response1['output']}")
print("\n=== 第二次对话 ===")
response2 = agent_executor.invoke({"input": "好厉害,刚才我们都聊了什么?"})
print(f"回答: {response2['output']}")
print("\n=== 第三次对话(测试记忆) ===")
response3 = agent_executor.invoke({"input": "请计算 2+3*4,然后告诉我你还记得我们之前讨论过什么话题?"})
print(f"回答: {response3['output']}")
Agent 框架
根据框架和实现方式的差异,这里简单将 Agent 框架分为两大类:Single-Agent 和 Multi-Agent,分别对应单智能体和多智能体架构,Multi-Agent 使用多个智能体来解决更复杂的问题
🧠 一、Single-Agent(单智能体)框架
1. BabyAGI
-
特点:
- 基于任务列表驱动(Task List),自动规划下一步要做什么。
- 使用 LLM 生成任务、执行任务、评估结果,并决定是否需要新增任务。
-
优点:
- 简洁明了,适合学习 Agent 自主决策的基本流程。
-
缺点:
- 功能有限,无法处理复杂任务或与外部工具交互。
-
适合人群:初学者入门 Agent 架构设计。
2. AutoGPT
-
定位:个人 AI 助理,帮助完成用户指定的长期任务(如调研、写文章等)
-
核心能力:
- 支持多种外部工具(搜索引擎、浏览器、文件读写等)
- 可以自主规划任务步骤并执行
-
优点:
- 插件丰富,社区活跃
- 可视化输出清晰,可追踪每一步思考过程
-
缺点:
- 控制逻辑较弱(如不能设定最大迭代次数)
- 容易陷入无限循环或低效任务中
-
适合人群:希望快速搭建功能型 Agent 的人群。
👥 二、Multi-Agent(多智能体)框架
1. Generative Agents
-
GitHub: https://github.com/joonspk-research/generative_agents
-
论文:Generative Agents: Interactive Simulacra of Human Behavior
-
核心思想:模拟人类行为的社会互动,构建一个由多个智能体组成的虚拟社会。
-
关键特性:
- 记忆机制(Memory)
- 反思机制(Reflection)
- 计划机制(Planning)
-
优点:
- 社会仿真能力强,适合研究人类行为建模
-
缺点:
- 计算资源消耗高,部署复杂
-
适合人群:研究人员、社会模拟、游戏 AI 开发人员。
2. MetaGPT
-
中文文档支持好,社区活跃
-
核心理念:用“软件公司”的组织结构来完成项目需求。每个角色是一个 Agent。
- 产品经理、项目经理、架构师、工程师等角色各司其职。
-
主要功能:
- 输入一句话需求 → 输出 PRD、技术方案、代码等完整交付物
-
优点:
- 中文生态友好,易于本地化部署
- 角色分工明确,工程化思维强
-
缺点:
- 对任务定义格式要求较高
-
适合人群:软件工程团队、产品策划、企业级应用开发者
为什么 Agent 落地这么难?
1. 任务定义模糊 / 复杂
-
问题
用户需求不具体,如“帮我写一个网站”、“帮我做市场调研”,缺乏明确目标和边界。 -
后果
Agent 难以拆解任务,易陷入无限循环或低效执行。 -
示例
AutoGPT 可能反复搜索同一关键词,或生成大量无关内容。 -
解决方案
- 引导用户细化任务描述
- 利用意图识别技术自动解析并拆解复杂任务
2. 控制与约束能力弱
-
问题
缺乏停止条件、错误处理机制和资源限制策略。 -
后果
容易失控,执行无效步骤,浪费计算资源甚至产生费用。 -
示例
AutoGPT 默认无最大迭代次数限制,可能导致长时间运行却无产出。 -
解决方案
- 设置最大调用次数、超时机制
- 增加异常捕获与流程回滚机制
3. 外部工具集成困难
-
问题
工具接口规范不统一,数据格式多样,响应质量参差不齐。 -
后果
调用失败率高,流程中断,结果不可靠。 -
示例
Tavily 搜索返回噪音数据,LLM 解析后得出错误结论。 -
解决方案
- 统一工具接入标准(如 RESTful 接口封装)
- 引入中间层对数据进行清洗与校验
4. 记忆与上下文管理复杂
-
问题
Agent 需要记住历史状态、用户对话和操作记录。 -
后果
上下文丢失、重复操作、逻辑混乱,影响任务连续性。 -
示例
BabyAGI 使用简单列表存储记忆,检索效率低,容易遗忘关键信息。 -
解决方案
- 使用向量数据库或知识图谱增强记忆能力
- 设计上下文压缩机制提升长文本处理能力
5. 评估与反馈机制缺失
-
问题
缺乏任务完成度评估机制,无法获取用户实时反馈。 -
后果
Agent 难以自我优化,无法判断是否达成目标。 -
示例
HuggingGPT 缺乏用户反馈闭环,模型选择过程难以验证。 -
解决方案
- 引入任务评分系统(如准确率、效率等指标)
- 构建用户反馈通道,实现人机协同优化
6. 安全与隐私风险高
-
问题
可能访问敏感数据、执行危险命令、泄露用户隐私。 -
后果
存在数据泄露、恶意行为、合规性问题等风险。 -
示例
AutoGPT 支持文件读写功能,若被滥用可能破坏本地文件。 -
解决方案
- 限制权限范围(如沙箱机制)
- 加密通信、脱敏处理敏感信息
- 建立安全审计机制
7. 用户体验差
-
问题
输出冗长、重复,不符合用户预期。 -
后果
用户难理解 Agent 的思考过程,信任度低。 -
示例
MetaGPT 输出文档结构完整,但内容质量不稳定,需人工审核。 -
解决方案
- 精简输出内容,突出关键信息
- 提供可视化交互界面,提升可解释性
- 增强人机协作设计,让用户参与决策
总结一句话
[!TIP]
Agent 落地难,是因为它既需要强大的认知能力,也需要严密的工程控制、合理的任务设计和可靠的反馈机制。
目前我们正处于“从概念到产品”的过渡期,未来随着 LLM + 工程架构 + 人机协同的不断演进,Agent 将逐渐从玩具变成真正有用的生产力工具。
Funcation Calling
Function Calling 的起源
传统 LLM 的局限性
-
封闭的“黑盒子”结构:
- 传统的大模型(如 GPT-3)只能生成自然语言,不知道如何调用工具、检索信息或执行操作。
- 比如你问它“明天天气怎么样”,它可能只能胡乱猜测,因为无法连接实时天气服务。
-
缺乏可控性和可扩展性:
- 模型的推理过程不透明,无法指定它“先查再答”或“先调用数据库再总结”。
- 想让模型做一些有顺序的、需要外部操作的任务(如代码执行、数据库查询)很困难。
-
上下文长度限制 & 知识过时问题:
- 模型的知识是训练时固定的,无法更新。
- 没法自己“上网搜索”或“调用知识库 API”。
核心需求是:让大模型像程序一样,调用外部函数。
- 让模型“知道”有哪些函数可用,并“学会”在适当的时机调用它们。
OpenAI Function Calling:https://platform.openai.com/docs/guides/function-calling
Function Calling 的核心概念
核心:Function Calling 的核心是赋予大语言模型自动调用外部函数或 API 的能力。
本质:本质上,它让语言模型不仅仅基于内部知识生成答案,而是通过外挂的函数定义(函数库),自动选择合适函数,根据用户请求生成结构化调用(如 JSON),调用外部函数执行后获取真实运行结果,最终基于这些外部结果来生成更加精准、实时的回答。
[!TIP]
1.定义函数
2.大模型识别用户问题,获取到对应的工具 id、工具名称、工具参数。并决定使用哪个工具
3.手动调用第二步所需要的工具并将参数传入,得到工具的返回内容
4.将工具的返回内容重新组装提示词,重新问大模型,得到最终的回答
Function calling 和 Agent 中 tools 调用之间的区别
Function Calling 是一种模型调用单个函数的机制,而 Agent 的 Tools 调用是一种有目标、有策略的任务执行流程,可能涉及多个工具、多轮推理,Function Calling 是它的底层能力之一。
[!TIP]
案例场景
✅ 1. Function Calling(像是打电话问个问题)
模型直接调用一个函数:get_weather(“北京”)
然后得到天气结果,比如:“多云,12°C”。
再根据这个结果说:“建议穿外套。”
就像你告诉模型:“有这些按钮可以按”,它只按一个,然后给出答案。
✅ 2. Agent 调用工具(像一个助理帮你处理整个流程)
模型先调用
get_weather("北京")
→ 得到“12°C”;
然后自动再调用recommend_clothes(12°C)
→ 得到“穿毛衣”;
最后整合:“明天北京多云 12°C,建议穿毛衣。”
它自己会判断先做什么、再做什么,直到完成整个任务。
总结:Function Calling 是 Agent 能“真正动手做事”的能力。你有了 Agent,还必须给它 Function Calling,它才能把“任务规划”变成“实际操作”。
案例演示 1
注意:OpenAI 官方推荐的数据交互格式是:JSON。当数据量多的时候尽量使用 json 格式
import pandas as pd
from openai import OpenAI
from dotenv import load_dotenv
import os
import json
from io import StringIO
import numpy as np
# 加载环境变量
load_dotenv()
model = "qwen-max-0428"
api_key = os.getenv("DASHSCOPE_API_KEY")
api_base_url = os.getenv("DASHSCOPE_BASE_URL")
# 初始化OpenAI客户端
client = OpenAI(api_key=api_key, base_url=api_base_url)
# 加载样例数据 - 更丰富的员工数据
df_employees = pd.DataFrame({
'Name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve', 'Frank'],
'Age': [25, 30, 35, 28, 32, 45],
'Salary': [50000.0, 75000.5, 95000.75, 62000.0, 88000.25, 120000.0],
'Department': ['IT', 'HR', 'IT', 'Finance', 'IT', 'Finance'],
'IsMarried': [True, False, True, False, True, True],
'YearsExperience': [3, 5, 8, 4, 7, 15]
})
# 将DataFrame转换为JSON格式,orient='split' 参数将数据、索引和列分开存储
df_json = df_employees.to_json(orient='split')
print("原始员工数据:")
print(df_employees)
print("\nJSON格式:", df_json)
"""
函数定义的规范
在使用Function Calling(函数调用)与聊天模型交互时,除了要规范输入数据格式外,函数编写也需要遵循以下核心规范:
语义清晰的命名规范
函数名应采用动宾结构,准确体现功能意图(如calculate_monthly_revenue)
避免使用模糊缩写,优先选择完整英文单词组合
结构化参数设计
参数排列遵循"核心参数优先"原则(如目标对象、主操作参数靠前)
命名采用snake_case格式,保持与函数名的语义连贯性(如start_date对应get_date_range)
自文档化描述体系
函数描述应包含三重说明:
▸ 核心功能(What)
▸ 参数约束(Why)
▸ 返回值规范(How)
每个参数需注明:
▸ 数据类型(type)
▸ 取值约束(constraints)
"""
# 函数1: 计算薪资统计信息
def calculate_salary_statistics(input_json):
_"""_
_ 计算薪资的统计信息:平均值、中位数、最高值、最低值_
_ 参数:_
_ input_json (str): 包含员工数据的JSON格式字符串_
_ 返回:_
_ str: 薪资统计信息,以JSON格式返回_
_ """_
_ _try:
# 重新将JSON数据恢复成DataFrame数据
df = pd.read_json(StringIO(input_json), orient='split')
salary_stats = {
"average_salary": round(df['Salary'].mean(), 2),
"median_salary": round(df['Salary'].median(), 2),
"max_salary": round(df['Salary'].max(), 2),
"min_salary": round(df['Salary'].min(), 2),
"salary_range": round(df['Salary'].max() - df['Salary'].min(), 2)
}
return json.dumps(salary_stats)
except Exception as e:
return json.dumps({"error": str(e)})
# 函数2: 按部门分组统计
def analyze_by_department(input_json):
_"""_
_ 按部门统计员工数量、平均薪资和平均年龄_
_ 参数:_
_ input_json (str): 包含员工数据的JSON格式字符串_
_ 返回:_
_ str: 部门统计信息,以JSON格式返回_
_ """_
_ _try:
df = pd.read_json(StringIO(input_json), orient='split')
# 根据部门进行分组,并开始计算分组后的数据
dept_stats = df.groupby('Department').agg({
'Name': 'count',
'Salary': 'mean',
'Age': 'mean'
}).round(2)
# 组转最终返回的内容
result = {}
for dept in dept_stats.index:
result[dept] = {
"employee_count": int(dept_stats.loc[dept, 'Name']), # 获取上面分组的员工数量
"average_salary": round(dept_stats.loc[dept, 'Salary'], 2),
"average_age": round(dept_stats.loc[dept, 'Age'], 2)
}
return json.dumps(result)
except Exception as e:
return json.dumps({"error": str(e)})
# 函数3: 查找满足条件的员工
def find_employees_by_criteria(input_json, min_salary=None, max_age=None, department=None):
_"""_
_ 根据指定条件查找员工_
_ 参数:_
_ input_json (str): 包含员工数据的JSON格式字符串_
_ min_salary (float): 最低薪资要求(可选)_
_ max_age (int): 最大年龄限制(可选)_
_ department (str): 指定部门(可选)_
_ 返回:_
_ str: 符合条件的员工信息,以JSON格式返回_
_ """_
_ _try:
df = pd.read_json(StringIO(input_json), orient='split')
filtered_df = df.copy()
# 应用筛选条件
if min_salary is not None:
filtered_df = filtered_df[filtered_df['Salary'] >= min_salary]
if max_age is not None:
filtered_df = filtered_df[filtered_df['Age'] <= max_age]
if department is not None:
filtered_df = filtered_df[filtered_df['Department'] == department]
# 转换为可序列化的格式
result = {
"matching_employees": filtered_df[['Name', 'Age', 'Salary', 'Department']].to_dict('records'),
"count": len(filtered_df)
}
return json.dumps(result)
except Exception as e:
return json.dumps({"error": str(e)})
# 函数4: 计算经验与薪资相关性
def analyze_experience_salary_correlation(input_json):
_"""_
_ 分析工作经验与薪资的相关性_
_ 参数:_
_ input_json (str): 包含员工数据的JSON格式字符串_
_ 返回:_
_ str: 相关性分析结果,以JSON格式返回_
_ """_
_ _try:
df = pd.read_json(StringIO(input_json), orient='split')
correlation = df['YearsExperience'].corr(df['Salary'])
# 简单的线性回归分析,工作年限与工资的变化
z = np.polyfit(df['YearsExperience'], df['Salary'], 1)
result = {
"correlation_coefficient": round(correlation, 4), # 返回结果的第一项
"correlation_strength": "strong" if abs(correlation) > 0.7 else "moderate" if abs(
correlation) > 0.3 else "weak", # 根据相关系数的绝对值大小,判断其强度
"salary_increase_per_year": round(z[0], 2), # 线性回归的斜率部分,表示 每年经验提升对应的薪资增加
"base_salary_estimate": round(z[1], 2) # 截距部分:当一个人刚入行时(0年经验),预测的起薪
}
return json.dumps(result)
except Exception as e:
return json.dumps({"error": str(e)})
# 定义函数库
# 函数库对象必须是一个字典,一个键值对代表一个函数,其中Key是代表函数名称的字符串,而value表示对应的函数。
function_repository = {
"calculate_salary_statistics": calculate_salary_statistics,
"analyze_by_department": analyze_by_department,
"find_employees_by_criteria": find_employees_by_criteria,
"analyze_experience_salary_correlation": analyze_experience_salary_correlation,
}
# 定义所有可用的工具
tools = [
{
"type": "function",
"function": {
"name": "calculate_salary_statistics",
"description": "计算员工薪资的统计信息,包括平均值、中位数、最高值、最低值等",
"parameters": {
"type": "object",
"properties": {
"input_json": {
"type": "string",
"description": "包含员工数据的JSON格式字符串"
}
},
"required": ["input_json"]
}
}
},
{
"type": "function",
"function": {
"name": "analyze_by_department",
"description": "按部门分析员工数据,统计各部门的员工数量、平均薪资和平均年龄",
"parameters": {
"type": "object",
"properties": {
"input_json": {
"type": "string",
"description": "包含员工数据的JSON格式字符串"
}
},
"required": ["input_json"]
}
}
},
{
"type": "function",
"function": {
"name": "find_employees_by_criteria",
"description": "根据指定的条件(最低薪资、最大年龄、部门)查找符合条件的员工",
"parameters": {
"type": "object",
"properties": {
"input_json": {
"type": "string",
"description": "包含员工数据的JSON格式字符串"
},
"min_salary": {
"type": "number",
"description": "最低薪资要求(可选)"
},
"max_age": {
"type": "integer",
"description": "最大年龄限制(可选)"
},
"department": {
"type": "string",
"description": "指定部门名称(可选)"
}
},
"required": ["input_json"]
}
}
},
{
"type": "function",
"function": {
"name": "analyze_experience_salary_correlation",
"description": "分析工作经验与薪资之间的相关性,提供相关系数和趋势分析",
"parameters": {
"type": "object",
"properties": {
"input_json": {
"type": "string",
"description": "包含员工数据的JSON格式字符串"
}
},
"required": ["input_json"]
}
}
}
]
def run_query(query):
_"""运行单个查询"""_
_ _print(f"\n{'=' * 60}")
print(f"查询: {query}")
print('=' * 60)
# 构建messages
messages = [
{
"role": "system",
"content": f"你是一位专业的数据分析师。现在有一个员工数据集:{df_json}。请根据用户的要求选择合适的分析函数来处理数据。"
},
{
"role": "user",
"content": query
}
]
try:
# 第一次API调用
response = client.chat.completions.create(
model=model,
messages=messages,
tools=tools,
tool_choice="auto"
)
# 检查是否有函数调用
if response.choices[0].message.tool_calls:
# 获取对应的函数用
tool_call = response.choices[0].message.tool_calls[0]
# 获取调用的函数名称
function_name = tool_call.function.name
# 获取调用的函数参数
function_args = json.loads(tool_call.function.arguments)
# 获取函数的id
tool_call_id = tool_call.id
print(f"调用函数: {function_name}")
print(f"函数参数: {function_args}")
# 执行本地函数
local_function = function_repository[function_name]
# 将参数传入函数中,并获取结果
function_response = local_function(**function_args)
print(f"函数执行结果: {function_response}")
# 添加响应到messages
messages.append(response.choices[0].message)
messages.append({
"role": "tool",
"name": function_name,
"tool_call_id": tool_call_id,
"content": function_response
})
# 第二次API调用获取最终答案
final_response = client.chat.completions.create(
model=model,
messages=messages,
)
print(f"\n最终答案: {final_response.choices[0].message.content}")
else:
print(f"直接回答: {response.choices[0].message.content}")
except Exception as e:
print(f'查询出错: {e}')
# 运行示例查询
if __name__ == "__main__":
print("开始运行数据分析查询示例...")
# 测试不同的查询示例
test_queries = [
"请计算所有员工的薪资统计信息",
"请分析各个部门的员工情况",
"请找出薪资超过80000且年龄小于35岁的IT部门员工",
"请分析工作经验与薪资的相关性"
]
for query in test_queries:
run_query(query)
演示案例 2
通过 function call 接入爬虫代码去爬取对应的车票信息
爬虫代码和对应的 city.json 文件:
import requests
import json
from prettytable import PrettyTable
class Crawl:
def __init__(self):
self.headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
# "Cookie": "_uab_collina=174550402993338672803576; JSESSIONID=FCA64C3BDE64B5B9DEFB5DE7D29C2388; BIGipServerotn=1574502666.24610.0000; BIGipServerpassport=904397066.50215.0000; guidesStatus=off; highContrastMode=defaltMode; cursorStatus=off; route=c5c62a339e7744272a54643b3be5bf64; _jc_save_fromStation=%u957F%u6C99%2CCSQ; _jc_save_toStation=%u5317%u4EAC%2CBJP; _jc_save_toDate=2025-04-24; _jc_save_wfdc_flag=dc; _jc_save_fromDate=2025-04-26",
"Host": "kyfw.12306.cn",
"If-Modified-Since": "0",
"Pragma": "no-cache",
"Referer": "https://kyfw.12306.cn/otn/leftTicket/init?linktypeid=dc",
"src-ch-ua": "\" Not A;Brand\";v=\"99\", \"Chromium\";v=\"100\", \"Google Chrome\";v=\"100\"",
"src-ch-ua-mobile": "?0",
"src-ch-ua-platform": "\"Windows\"",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36",
"X-Requested-With": "XMLHttpRequest",
"cookie": "_uab_collina=175085401511427159958854; JSESSIONID=B3B822EBDE3411AF74F3EE633BF60CF3; _qimei_uuid42=196191412141005f7c7ea9736409c9327040dfa810; _qimei_fingerprint=0f7e61cc9848385f0a6882b8e27b9359; _qimei_i_3=21de6b81c60f038997c4fb365e8620e2f1ebacf312080bd0b18c2c0e26c0286f326330943989e29fb590; _qimei_h38=; _jc_save_fromStation=%u5317%u4EAC%2CBJP; _jc_save_toStation=%u4E0A%u6D77%2CSHH; _jc_save_toDate=2025-06-25; _jc_save_wfdc_flag=dc; _jc_save_fromDate=2025-06-29; route=495c805987d0f5c8c84b14f60212447d; BIGipServerotn=1675165962.24610.0000; guidesStatus=off; highContrastMode=defaltMode; cursorStatus=off; _qimei_i_1=43e779c4ed2d"
}
def main(self, time_, start, end):
f = open('city.json', 'r', encoding='utf-8')
city = f.read()
city = json.loads(city)
start_city = city[start]
end_city = city[end]
url = 'https://kyfw.12306.cn/otn/leftTicket/queryU'
params = {
'leftTicketDTO.train_date': time_,
'leftTicketDTO.from_station': start_city,
'leftTicketDTO.to_station': end_city,
'purpose_codes': 'ADULT'
}
list_dic = []
dict_dic = []
json_data_list = requests.get(url=url, headers=self.headers, params=params).json()
json_data_lists = json_data_list['data']['result']
for i in range(0, 10):
info = json_data_lists[i].split('|')
num = info[3] # 车次
start_time = info[8] # 出发时间
end_time = info[9] # 到达时间
use_time = info[10] # 耗时
top_grade = info[32] # 特等座
first_class = info[31] # 一等
second_class = info[30] # 二等
soft_sleeper = info[23] # 软卧
hard_sleeper = info[28] # 硬卧
hard_seat = info[29] # 硬座
no_seat = info[26] # 无座
list_dic.append([
num,
start_time,
end_time,
use_time,
top_grade,
first_class,
second_class,
soft_sleeper,
hard_sleeper,
hard_seat,
no_seat
])
dict_dic.append({num: {
"出发时间": start_time,
"到达时间": end_time,
"耗时": use_time,
"特等座": first_class,
"一等": second_class,
"二等": soft_sleeper,
"软卧": hard_sleeper,
"硬卧": hard_seat,
"硬座": no_seat
}})
# pt = PrettyTable()
# pt.field_names = ["车次", "出发时间", "到达时间", "耗时", "特等座", "一等", "二等", "软卧", "硬卧", "硬座",
# "无座"]
# pt.add_rows(list_dic)
# return pt
return dict_dic
if __name__ == '__main__':
print(Crawl().main("2025-07-30", "长沙", "北京"))
大模型代码:
from openai import OpenAI
import os
from dotenv import load_dotenv
from get_train_number_info import Crawl
from datetime import datetime
import json
load_dotenv()
model = "qwen-max-2025-01-25" # 模型需要选择好一点的,不然会识别不到对应的任务
api_key = os.getenv("DASHSCOPE_API_KEY")
api_base_url = os.getenv("DASHSCOPE_BASE_URL")
client = OpenAI(api_key=api_key, base_url=api_base_url)
# 获取当前日期
def check_date():
today = datetime.now().date()
return today
# 定义函数库
# 函数库对象必须是一个字典,一个键值对代表一个函数,其中Key是代表函数名称的字符串,而value表示对应的函数。
function_repository = {
"check_train_number_info": Crawl().main,
"check_date": check_date
}
# 大模型执行
def get_llm_response(messages, model):
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=0,
max_tokens=1024,
tools=[
{
"type": "function",
"function": {
"name": "check_train_number_info",
"description": "根据给定的日期查询对应的车票信息",
"parameters": {
"type": "object",
"properties": {
"time_": {
"type": "string",
"description": "日期",
},
"start": {
"type": "string",
"description": "出发站",
},
"end": {
"type": "string",
"description": "终点站",
}
},
}
}
},
{
"type": "function",
"function": {
"name": "check_date",
"description": "返回当前的日期",
"parameters": {
"type": "object",
"properties": {
}
}
}
}
]
)
return response.choices[0].message
prompt = "查询明天长沙到上海的票"
messages = [
{"role": "system", "content": "你是一个超级地图助手,你可以找到任何地址"},
{"role": "user", "content": prompt}
]
response = get_llm_response(messages, model)
messages.append(response) # 把大模型的回复加入到对话中
print("=====大模型回复1=====")
print(response)
# 如果返回的是函数调用结果,则打印出来
while (response.tool_calls is not None):
for tool_call in response.tool_calls:
args = json.loads(tool_call.function.arguments)
print("参数:", args)
# 执行本地函数
function_response = function_repository[tool_call.function.name](**args)
print("=====函数返回=====")
print(function_response)
messages.append({
"tool_call_id": tool_call.id, # 用于标识函数调用的 ID
"role": "tool",
"name": tool_call.function.name,
"content": str(function_response) # 数值result 必须转成字符串
})
print("messages:", messages)
response = get_llm_response(messages, model)
print("=====大模型回复2=====")
print(response)
messages.append(response) # 把大模型的回复加入到对话中
print("=====最终回复=====")
print(response.content)
国产大模型
- Function Calling 会成为所有大模型的标配,支持它的越来越多
- 不支持的大模型,某种程度上是不大可用的
ChatGLM
官方文档:https://github.com/THUDM/ChatGLM3/tree/main/tools_using_demo
开发平台:https://open.bigmodel.cn/console/overview
pip install zhipuai
import os
from datetime import datetime
from get_train_number_info import Crawl
from zhipuai import ZhipuAI
import json
from dotenv import load_dotenv
load_dotenv()
client = ZhipuAI(api_key=os.getenv("ZHIPU_API_KEY"))
# 获取当前日期
def check_date():
today = datetime.now().date()
return {"today": str(today)}
# 定义函数库
# 函数库对象必须是一个字典,一个键值对代表一个函数,其中Key是代表函数名称的字符串,而value表示对应的函数。
function_repository = {
"check_train_number_info": Crawl().main,
"check_date": check_date
}
# 大模型执行
def get_llm_response(messages, model: str = "glm-4-plus"):
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=0,
max_tokens=1024,
tools=[
{
"type": "function",
"function": {
"name": "check_train_number_info",
"description": "根据给定的日期查询对应的车票信息",
"parameters": {
"type": "object",
"properties": {
"time_": {
"type": "string",
"description": "日期",
},
"start": {
"type": "string",
"description": "出发站",
},
"end": {
"type": "string",
"description": "终点站",
}
},
}
}
},
{
"type": "function",
"function": {
"name": "check_date",
"description": "返回当前的日期",
"parameters": {
"type": "object",
"properties": {
}
}
}
}
]
)
return response.choices[0].message
prompt = "查询后天长沙到上海的票"
messages = [
{"role": "system", "content": "你是一个超级地图助手,你可以找到任何地址"},
{"role": "user", "content": prompt}
]
response = get_llm_response(messages)
messages.append(response.model_dump()) # 把大模型的回复加入到对话中
print("=====大模型回复1=====")
print(response.model_dump())
# 如果返回的是函数调用结果,则打印出来
while (response.tool_calls is not None):
for tool_call in response.tool_calls:
args = json.loads(tool_call.function.arguments)
print("参数:", args)
# 执行本地函数
function_response = function_repository[tool_call.function.name](**args)
print("=====函数返回=====")
print(function_response)
messages.append({
"tool_call_id": tool_call.id, # 用于标识函数调用的 ID
"role": "tool",
"name": tool_call.function.name,
"content": f"{function_response}"
})
print("messages:", messages)
response = get_llm_response(messages)
print("=====大模型回复2=====")
print(response.model_dump())
messages.append(response.model_dump()) # 把大模型的回复加入到对话中
print("=====最终回复=====")
print(response.content)
DeepSeek
官方地址:https://platform.deepseek.com/usage
接口文档:https://api-docs.deepseek.com/zh-cn/guides/function_calling
from openai import OpenAI
import os
from dotenv import load_dotenv
from get_train_number_info import Crawl
from datetime import datetime
import json
load_dotenv()
model = "deepseek-chat"
api_key = os.getenv("DEEPSEEK_API_KEY")
api_base_url = os.getenv("DEEPSEEK_BASE_URL")
client = OpenAI(api_key=api_key, base_url=api_base_url)
# 获取当前日期
def check_date():
today = datetime.now().date()
return today
# 定义函数库
# 函数库对象必须是一个字典,一个键值对代表一个函数,其中Key是代表函数名称的字符串,而value表示对应的函数。
function_repository = {
"check_train_number_info": Crawl().main,
"check_date": check_date
}
# 大模型执行
def get_llm_response(messages, model):
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=0,
max_tokens=1024,
tools=[
{
"type": "function",
"function": {
"name": "check_train_number_info",
"description": "根据给定的日期查询对应的车票信息",
"parameters": {
"type": "object",
"properties": {
"time_": {
"type": "string",
"description": "日期",
},
"start": {
"type": "string",
"description": "出发站",
},
"end": {
"type": "string",
"description": "终点站",
}
},
}
}
},
{
"type": "function",
"function": {
"name": "check_date",
"description": "返回当前的日期",
"parameters": {
"type": "object",
"properties": {
}
}
}
}
]
)
return response.choices[0].message
prompt = "查询后天北长沙到上海的票"
messages = [
{"role": "system", "content": "你是一个超级地图助手,你可以找到任何地址"},
{"role": "user", "content": prompt}
]
response = get_llm_response(messages, model)
messages.append(response) # 把大模型的回复加入到对话中
print("=====大模型回复1=====")
print(response)
# 如果返回的是函数调用结果,则打印出来
while (response.tool_calls is not None):
for tool_call in response.tool_calls:
args = json.loads(tool_call.function.arguments)
print("参数:", args)
# 执行本地函数
function_response = function_repository[tool_call.function.name](**args)
print("=====函数返回=====")
print(function_response)
messages.append({
"tool_call_id": tool_call.id, # 用于标识函数调用的 ID
"role": "tool",
"name": tool_call.function.name,
"content": str(function_response) # 数值result 必须转成字符串
})
print("messages:", messages)
response = get_llm_response(messages, model)
print("=====大模型回复2=====")
print(response)
messages.append(response) # 把大模型的回复加入到对话中
print("=====最终回复=====")
print(response.content)
用 Function Calling 实现 mysql 多表查询
表创建和表数据填充
CREATE TABLE customers1 (
customer_id INT AUTO_INCREMENT PRIMARY KEY COMMENT '客户ID,主键',
name VARCHAR(100) NOT NULL COMMENT '客户姓名',
email VARCHAR(100) COMMENT '客户邮箱',
phone VARCHAR(20) COMMENT '联系电话',
address TEXT COMMENT '联系地址',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间'
) COMMENT='客户信息表';
CREATE TABLE products (
product_id INT AUTO_INCREMENT PRIMARY KEY COMMENT '商品ID,主键',
product_name VARCHAR(100) NOT NULL COMMENT '商品名称',
description TEXT COMMENT '商品描述',
price DECIMAL(10, 2) NOT NULL COMMENT '商品单价',
stock INT NOT NULL DEFAULT 0 COMMENT '库存数量',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间'
) COMMENT='商品信息表';
CREATE TABLE orders1 (
order_id INT AUTO_INCREMENT PRIMARY KEY COMMENT '订单ID,主键',
customer_id INT NOT NULL COMMENT '关联的客户ID',
order_date DATE NOT NULL COMMENT '下单日期',
total_amount DECIMAL(10, 2) NOT NULL COMMENT '订单总金额',
status ENUM('pending', 'completed', 'cancelled') DEFAULT 'pending' COMMENT '订单状态: pending待处理, completed已完成, cancelled已取消',
FOREIGN KEY (customer_id) REFERENCES customers1(customer_id)
) COMMENT='订单主表';
CREATE TABLE order_details (
order_detail_id INT AUTO_INCREMENT PRIMARY KEY COMMENT '订单明细ID',
order_id INT NOT NULL COMMENT '关联的订单ID',
product_id INT NOT NULL COMMENT '关联的商品ID',
quantity INT NOT NULL COMMENT '购买数量',
price DECIMAL(10, 2) NOT NULL COMMENT '商品单价',
subtotal DECIMAL(10, 2) NOT NULL COMMENT '小计金额(单价 × 数量)',
FOREIGN KEY (order_id) REFERENCES orders1(order_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
) COMMENT='订单明细表,记录每个订单中包含的商品';
-- 添加两个客户
INSERT INTO customers1 (name, email, phone, address) VALUES
('张三', 'zhangsan@example.com', '13800001111', '北京市朝阳区'),
('李四', 'lisi@example.com', '13900002222', '上海市浦东新区');
-- 添加三种商品
INSERT INTO products (product_name, description, price, stock) VALUES
('iPhone 15 Pro', '苹果最新旗舰手机', 8999.00, 100),
('华为 Mate60', '国产高端智能手机', 7999.00, 150),
('小米电视 65寸', '4K超高清智能电视', 4999.00, 50);
-- 创建两个订单
INSERT INTO orders1 (customer_id, order_date, total_amount, status) VALUES
(1, '2025-06-20', 17998.00, 'completed'),
(2, '2025-06-21', 4999.00, 'pending');
-- 第一单:张三买了两部 iPhone 15 Pro
INSERT INTO order_details (order_id, product_id, quantity, price, subtotal) VALUES
(1, 1, 2, 8999.00, 17998.00);
-- 第二单:李四买了一台小米电视
INSERT INTO order_details (order_id, product_id, quantity, price, subtotal) VALUES
(2, 3, 1, 4999.00, 4999.00);
from openai import OpenAI
import os
import mysql.connector
import json
from dotenv import load_dotenv
load_dotenv()
load_dotenv()
model = "qwen-plus-2025-04-28"
api_key = os.getenv("DASHSCOPE_API_KEY")
api_base_url = os.getenv("DASHSCOPE_BASE_URL")
client = OpenAI(api_key=api_key, base_url=api_base_url)
database_schema_string = """
CREATE TABLE customers1 (
customer_id INT AUTO_INCREMENT PRIMARY KEY COMMENT '客户ID,主键',
name VARCHAR(100) NOT NULL COMMENT '客户姓名',
email VARCHAR(100) COMMENT '客户邮箱',
phone VARCHAR(20) COMMENT '联系电话',
address TEXT COMMENT '联系地址',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间'
) COMMENT='客户信息表';
CREATE TABLE products (
product_id INT AUTO_INCREMENT PRIMARY KEY COMMENT '商品ID,主键',
product_name VARCHAR(100) NOT NULL COMMENT '商品名称',
description TEXT COMMENT '商品描述',
price DECIMAL(10, 2) NOT NULL COMMENT '商品单价',
stock INT NOT NULL DEFAULT 0 COMMENT '库存数量',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间'
) COMMENT='商品信息表';
CREATE TABLE orders1 (
order_id INT AUTO_INCREMENT PRIMARY KEY COMMENT '订单ID,主键',
customer_id INT NOT NULL COMMENT '关联的客户ID',
order_date DATE NOT NULL COMMENT '下单日期',
total_amount DECIMAL(10, 2) NOT NULL COMMENT '订单总金额',
status ENUM('pending', 'completed', 'cancelled') DEFAULT 'pending' COMMENT '订单状态: pending待处理, completed已完成, cancelled已取消',
FOREIGN KEY (customer_id) REFERENCES customers1(customer_id)
) COMMENT='订单主表';
CREATE TABLE order_details (
order_detail_id INT AUTO_INCREMENT PRIMARY KEY COMMENT '订单明细ID',
order_id INT NOT NULL COMMENT '关联的订单ID',
product_id INT NOT NULL COMMENT '关联的商品ID',
quantity INT NOT NULL COMMENT '购买数量',
price DECIMAL(10, 2) NOT NULL COMMENT '商品单价',
subtotal DECIMAL(10, 2) NOT NULL COMMENT '小计金额(单价 × 数量)',
FOREIGN KEY (order_id) REFERENCES orders1(order_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
) COMMENT='订单明细表,记录每个订单中包含的商品';
"""
connection = mysql.connector.connect(
host='127.0.0.1', # 例如 'localhost'
port=3306, # MySQL默认端口号
user='root', # MySQL用户名
passwd='root', # MySQL用户密码
database='llm' # 要连接的数据库名
)
cursor = connection.cursor()
def get_sql_completion(messages, model):
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=0, # 模型输出的随机性,0 表示随机性最小
tools=[{
"type": "function",
"function": {
"name": "ask_database",
"description": "使用这个函数来回答有关业务的用户问题。输出应该是一个完整的SQL查询",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": f"""
SQL查询提取信息以回答用户的问题。
SQL应该使用这个数据库架构来编写:
{database_schema_string}
查询应以纯文本形式返回,而不是JSON格式。
查询应仅包含MySQL支持的语法.
""",
}
},
"required": ["query"],
}
}
}],
)
return response.choices[0].message
def ask_database(query):
cursor.execute(query)
records = cursor.fetchall()
return records
# 定义函数库
# 函数库对象必须是一个字典,一个键值对代表一个函数,其中Key是代表函数名称的字符串,而value表示对应的函数。
function_repository = {
"ask_database": ask_database,
}
prompt = "张三买了哪些商品?"
messages = [
{"role": "system", "content": "基于表回答用户问题(表数据大部分是中文)"},
{"role": "user", "content": prompt}
]
response = get_sql_completion(messages, model)
messages.append(response)
if response.tool_calls is not None:
tool_call = response.tool_calls[0]
arguments = tool_call.function.arguments
args = json.loads(arguments)
print("====SQL====")
print(args["query"])
function_response = function_repository[tool_call.function.name](args["query"])
print("====MySQL数据库查询结果====")
print(function_response)
messages.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": "ask_database",
"content": str(function_response)
})
response = get_sql_completion(messages, model)
print("====最终回复====")
print(response.content)
生成 tools 描述的工具
import inspect
from typing import List, Callable, get_origin, get_args
import re
def tool_for_generating_descriptions(func_list: List[Callable]) -> List[dict]:
_"""_
_ 自动将函数列表转换为 OpenAI Function Call 格式的 JSON 描述_
_ Args:_
_ func_list: 函数列表_
_ Returns:_
_ 包含函数描述的字典列表_
_ """_
_ _tools_description = []
for func in func_list:
# 获取函数的签名信息
sig = inspect.signature(func)
func_params = sig.parameters
# 函数的参数描述
parameters = {
'type': 'object',
'properties': {},
'required': []
}
# 解析 docstring 中的参数描述
param_descriptions = parse_docstring_params(func.__doc__ or "")
# 获取参数和参数名称
for param_name, param in func_params.items():
# 获取参数类型
param_type = convert_python_type_to_json_schema(param.annotation)
# 获取参数描述
param_desc = param_descriptions.get(param_name, "")
# 添加参数描述和类型
parameters['properties'][param_name] = {
'type': param_type,
'description': param_desc
}
# 如果参数没有默认值,那么它是必须的
if param.default == param.empty:
parameters['required'].append(param_name)
# 函数描述字典
func_dict = {
"type": "function",
"function": {
"name": func.__name__,
"description": (func.__doc__ or "").strip().split('\n')[0] if func.__doc__ else "",
"parameters": parameters
}
}
tools_description.append(func_dict)
return tools_description
def convert_python_type_to_json_schema(annotation) -> str:
_"""_
_ 将 Python 类型注解转换为 JSON Schema 类型_
_ Args:_
_ annotation: Python 类型注解_
_ Returns:_
_ JSON Schema 类型字符串_
_ """_
_ _if annotation == inspect.Parameter.empty or annotation is None:
return "string"
# 基本类型映射
type_mapping = {
str: "string",
int: "integer",
float: "number",
bool: "boolean",
list: "array",
dict: "object",
}
# 直接类型匹配
if annotation in type_mapping:
return type_mapping[annotation]
# 处理 typing 模块的类型
origin = get_origin(annotation)
if origin is not None:
if origin is list or origin is List:
return "array"
elif origin is dict or origin is dict:
return "object"
elif origin is tuple:
return "array"
elif hasattr(annotation, '__name__') and 'Union' in annotation.__name__:
# 处理 Union 类型,取第一个非 None 类型
args = get_args(annotation)
for arg in args:
if arg is not type(None):
return convert_python_type_to_json_schema(arg)
# 处理字符串形式的类型注解
if hasattr(annotation, '__name__'):
type_name = annotation.__name__.lower()
if 'str' in type_name:
return "string"
elif 'int' in type_name:
return "integer"
elif 'float' in type_name:
return "number"
elif 'bool' in type_name:
return "boolean"
elif 'list' in type_name:
return "array"
elif 'dict' in type_name:
return "object"
# 默认返回 string
return "string"
def parse_docstring_params(docstring: str) -> dict:
_"""_
_ 从 docstring 中解析参数描述_
_ 支持 Google 风格和 NumPy 风格的 docstring_
_ Args:_
_ docstring: 函数的文档字符串_
_ Returns:_
_ 参数名到描述的映射字典_
_ """_
_ _if not docstring:
return {}
param_descriptions = {}
lines = docstring.split('\n')
# 查找参数部分
in_params_section = False
current_param = None
for line in lines:
line = line.strip()
# 检测参数部分开始
if line.lower() in ['args:', 'arguments:', 'parameters:', 'param:']:
in_params_section = True
continue
# 检测参数部分结束
if in_params_section and line.lower() in ['returns:', 'return:', 'yields:', 'raises:', 'examples:', 'example:']:
break
if in_params_section and line:
# 获取字符串中对应的字段和字段的描述
google_match = re.match(r'^\s*(\w+)\s*(?:\([^)]*\))?\s*:\s*(.*)$', line)
if google_match:
param_name, description = google_match.groups()
param_descriptions[param_name] = description.strip()
current_param = param_name
elif current_param and line.startswith(' '):
# 继续上一个参数的描述
param_descriptions[current_param] += ' ' + line.strip()
return param_descriptions
# 示例使用
if __name__ == "__main__":
# 定义一些示例函数
def calculate_area(length: float, width: float, unit: str = "m") -> float:
_"""_
_ 计算矩形面积_
_ Args:_
_ length: 矩形的长度_
_ width: 矩形的宽度 _
_ unit: 长度单位,默认为米_
_ Returns:_
_ 矩形的面积_
_ """_
_ _return length * width
def send_email(to: str, subject: str, body: str, attachments: List[str] = None):
_"""_
_ 发送邮件_
_ Args:_
_ to: 收件人邮箱地址_
_ subject: 邮件主题_
_ body: 邮件正文_
_ attachments: 附件文件路径列表,可选_
_ """_
_ _pass
def get_weather(city: str, country: str = "CN", units: str = "metric") -> dict:
_"""_
_ 获取天气信息_
_ Args:_
_ city: 城市名称_
_ country: 国家代码,默认为中国_
_ units: 温度单位,metric为摄氏度,imperial为华氏度_
_ Returns:_
_ 包含天气信息的字典_
_ """_
_ _return {}
# 生成函数调用描述
functions = [calculate_area, send_email, get_weather]
result = tool_for_generating_descriptions(functions)
# 打印结果
import json
for func_desc in result:
print(json.dumps(func_desc, indent=2, ensure_ascii=False))
print("-" * 50)
Agent 核心认知框架
Plan-and-Execute
Plan-and-Execute 是一种 AI 决策机制,分为两个核心阶段:
- Planning(规划):根据当前状态和目标,生成一个可行的任务路径或操作序列。
- Execution(执行):按计划逐步执行任务,并在每一步中评估结果,必要时进行反馈修正。
[!TIP]
[用户输入/环境状态]
↓
[任务理解]
↓
[任务分解]
↓
[生成执行计划]
↓
[逐项执行任务]
↓
[观察执行结果]
↓
[是否达成目标?] → 否 → 调整计划并继续执行
↓ 是
[返回结果]
示例:
pip install langchain-experimental
from langchain_openai import ChatOpenAI
from langchain_experimental.plan_and_execute import PlanAndExecute, load_agent_executor, load_chat_planner
from langchain_community.utilities import SerpAPIWrapper
from langchain_core.tools import Tool
from langchain.chains import LLMMathChain
from dotenv import load_dotenv
import os
load_dotenv()
model = "qwen-plus-2025-04-28"
api_key = os.getenv("DASHSCOPE_API_KEY")
api_base_url = os.getenv("DASHSCOPE_BASE_URL")
llm = ChatOpenAI(model=model, api_key=api_key, base_url=api_base_url)
# 创建工具
search = SerpAPIWrapper()
llm_math_chain = LLMMathChain(llm=llm, verbose=True)
# 定义工具列表
tools = [
Tool(
name="Search",
func=search.run,
description="用于搜索当前的价格信息、汇率或其他实时数据"
),
Tool(
name="Calculator",
func=llm_math_chain.run,
description="用于进行数学计算和预算分析"
)
]
# 加载规划器和执行器
planner = load_chat_planner(llm)
executor = load_agent_executor(llm, tools, verbose=True)
# 创建Plan and Execute代理
agent = PlanAndExecute(planner=planner, executor=executor, verbose=True)
# 运行代理解决旅行预算问题
result = agent.run("我有5000人民币的预算,想去日本旅行5天。请帮我计算一下,按照当前汇率,这些钱在东京能住几晚中等价位的酒店?")
print(result)
Self-Ask
它通过让模型在回答问题之前先进行自我询问,以分解复杂的问题或者获取更多的上下文信息,从而提高回答的准确性和相关性。这种方法尤其适用于那些需要多步推理或涉及多个知识点的问题。
假设有一个问题:“美国最高的山峰位于哪个国家公园内?”使用 Self-Ask
方法,模型可能会这样处理:
-
第一步:自我提问“美国最高的山峰是哪座山?”
- 答案:麦金利山(也称为德纳里山)。
-
第二步:接着问“麦金利山位于哪个国家公园?”
- 答案:德纳里国家公园。
通过这样的两步推理,模型能够准确地回答原始问题:“美国最高的山峰位于德纳里国家公园内。
示例:
from langchain_openai import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
from langchain_community.utilities import SerpAPIWrapper
from langchain_core.tools import Tool
from dotenv import load_dotenv
import os
load_dotenv()
model = "qwen-plus-2025-07-14"
api_key = os.getenv("DASHSCOPE_API_KEY")
api_base_url = os.getenv("DASHSCOPE_BASE_URL")
llm = ChatOpenAI(model=model, api_key=api_key, base_url=api_base_url)
# 创建搜索工具
search = SerpAPIWrapper()
# 定义搜索工具
tools = [
Tool(
name="Intermediate Answer",
func=search.run,
description="用于搜索问题的中间答案。输入应该是一个搜索查询。"
)
]
# 创建Self-Ask代理
agent = initialize_agent(
tools,
llm,
agent=AgentType.SELF_ASK_WITH_SEARCH,
verbose=True,
handle_parsing_errors=True
)
# 测试代理
question = "2024年巴黎奥运会中国获得了多少枚金牌?中国在奖牌榜上排第几?"
result = agent.run(question)
print(f"代理最终答案: {result}")
Thinking and Self-Refection
思考并自我反思(Thinking and Self-Refection)框架主要用于模拟和实现复杂决策过程,通过不断自我评估和调整,使系统能够学习并改进决策过程,从而在面对复杂问题是作出更加有效的决策。
思考阶段 (Thinking Phase)
- 内部推理:模型在内部进行推理过程
- 步骤分解:将复杂问题分解为多个步骤
- 假设生成:生成多个可能的解决方案
反思阶段 (Self-Reflection Phase)
- 结果评估:评估初始答案的质量
- 错误识别:识别推理中的潜在错误
- 改进建议:提出改进方案
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv
import re
import os
load_dotenv()
class ThinkingReflectionFramework:
def __init__(self, model="qwen-plus-2025-04-28", temperature=0.7):
self.llm = ChatOpenAI(model=model,
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
temperature=temperature)
self.parser = StrOutputParser()
def create_thinking_prompt(self):
_"""创建思考阶段的提示词"""_
_ _thinking_template = """
你需要解决以下问题,请按照以下步骤进行思考:
<thinking>
1. 问题分析:仔细分析问题的核心要求
2. 知识回顾:回顾相关的知识和概念
3. 解决方案:提出可能的解决方案
4. 步骤规划:制定详细的解决步骤
</thinking>
问题:{question}
请在<thinking>标签内详细展示你的思考过程,然后给出最终答案。
"""
return ChatPromptTemplate.from_template(thinking_template)
def create_reflection_prompt(self):
_"""创建反思阶段的提示词"""_
_ _reflection_template = """
原问题:{question}
初始答案:{initial_answer}
请对上述答案进行反思和评估:
<reflection>
1. 答案准确性:分析答案是否准确回答了问题
2. 逻辑完整性:检查推理逻辑是否完整
3. 遗漏分析:是否遗漏了重要信息
4. 改进建议:提出具体的改进建议
</reflection>
基于反思,请提供一个改进后的最终答案。
"""
return ChatPromptTemplate.from_template(reflection_template)
def extract_thinking_content(self, response):
_"""提取思考内容"""_
_ _thinking_match = re.search(r'<thinking>(.*?)</thinking>', response, re.DOTALL)
if thinking_match:
return thinking_match.group(1).strip()
return ""
def extract_reflection_content(self, response):
_"""提取反思内容"""_
_ _reflection_match = re.search(r'<reflection>(.*?)</reflection>', response, re.DOTALL)
if reflection_match:
return reflection_match.group(1).strip()
return ""
def solve_with_thinking_reflection(self, question, max_iterations=2):
_"""使用思考和反思框架解决问题"""_
_ _print("=" * 60)
print("🧠 思考和自我反思框架")
print("=" * 60)
# 第一阶段:思考
print("\n🤔 思考中")
print("-" * 30)
thinking_prompt = self.create_thinking_prompt()
thinking_chain = thinking_prompt | self.llm | self.parser
initial_response = thinking_chain.invoke({"question": question})
# 提取思考过程
thinking_content = self.extract_thinking_content(initial_response)
if thinking_content:
print("思考过程:")
print(thinking_content)
print(f"\n初始答案:")
print(initial_response.replace(f"<thinking>{thinking_content}</thinking>", "").strip())
# 第二阶段:反思和改进
current_answer = initial_response
for i in range(max_iterations):
print(f"\n🔄 反思中 {i + 1}")
print("-" * 30)
reflection_prompt = self.create_reflection_prompt()
reflection_chain = reflection_prompt | self.llm | self.parser
reflection_response = reflection_chain.invoke({
"question": question,
"initial_answer": current_answer
})
# 提取反思过程
reflection_content = self.extract_reflection_content(reflection_response)
if reflection_content:
print("反思过程:")
print(reflection_content)
# 提取改进后的答案
improved_answer = reflection_response.replace(f"<reflection>{reflection_content}</reflection>", "").strip()
print(f"\n改进后答案:")
print(improved_answer)
current_answer = improved_answer
return current_answer
# 使用示例
if __name__ == "__main__":
framework = ThinkingReflectionFramework()
# 测试问题1:数学逻辑问题
question1 = "如果一个农夫有17只羊,除了9只以外都死了,那么还剩下多少只羊?"
result1 = framework.solve_with_thinking_reflection(question1)
print("\n" + "=" * 60)
print("最终结果:", result1)
# 测试问题2:复杂推理问题
print("\n" + "=" * 80)
question2 = "一个人每天工作8小时,一周工作5天。如果他的时薪是25元,那么他一个月(4周)能赚多少钱?但是如果他加班超过40小时,超出部分按1.5倍计算。假设他每周加班5小时,计算他的月收入。"
result2 = framework.solve_with_thinking_reflection(question2)
print("\n" + "=" * 60)
print("最终结果:", result2)
ReAct 框架回顾
ReAct(Reasoning and Acting)是一个结合推理(Reasoning)和行动(Acting)的语言模型框架,通过让 LLM 生成语言推理轨迹和任务行动来解决复杂问题
1. 框架定义
ReAct 代理是一个 AI 代理,使用"推理和行动"框架来结合思维链(CoT)推理与外部工具使用。ReAct 促使 LLM 为任务生成语言推理轨迹和行动,这使得系统能够执行动态推理来创建、维护和调整行动计划,同时与外部环境(如 Wikipedia)交互以整合额外信息。
2. 工作原理
ReAct 的核心是 Thought-Action-Observation 循环:
- Thought(思考):分析当前情况,规划下一步
- Action(行动):执行具体的工具调用或操作
- Observation(观察):获取行动结果并分析
[!TIP]
template = “”"你是一个乐于助人的智能助手,在回答问题时应通过推理,并在需要时调用工具。
你可以使用以下工具:
{tools}
请遵循以下格式(注意英文冒号不能省略):
以下英文对应的内容必须包含必须要有
Question: 你需要回答的输入问题
Thought: 请一步步思考如何解答这个问题
Action: 你要执行的操作,必须是以下之一 [{tool_names}]
Action Input: 传入该操作的输入
Observation: 操作执行后的返回结果
…(Thought/Action/Action Input/Observation 的过程可以重复多次)
Thought: 我现在已经知道最终答案
Final Answer: 对最初问题的最终回答
即使你觉得自己知道答案,也请优先考虑使用工具获取答案。
现在开始!
Question: {input} # 必须要有
{agent_scratchpad} # 必须要有,将之前 Agent 思考和行动的记录填充到提示词中,方便大模型去参考之前的内容去进行思考
“”"
示例:
from langchain_openai import ChatOpenAI
from langchain.agents import create_react_agent, AgentExecutor
from langchain_community.utilities import SerpAPIWrapper
from langchain_core.tools import Tool
from langchain_core.tools import tool
import numexpr
import math
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain.chains import LLMMathChain
from dotenv import load_dotenv
import os
load_dotenv()
class ReActFramework:
def __init__(self, model="qwen-max-0428", temperature=0.1):
self.llm = ChatOpenAI(model=model,
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
temperature=temperature)
self.tools = self._create_tools()
self.agent = self._create_agent()
def _create_tools(self):
_"""创建ReAct代理使用的工具集"""_
_ _# 1. 搜索工具
search = SerpAPIWrapper()
search_tool = Tool(
name="Search",
func=search.run,
description="用于搜索当前信息、新闻、实时数据等。输入应该是搜索查询。"
)
# 2. Wikipedia工具
wikipedia = WikipediaAPIWrapper(
lang="zh",
top_k_results=1,
doc_content_chars_max=800 # 限制内容长度,避免token过多
)
wikipedia = WikipediaQueryRun(api_wrapper=wikipedia,
name="维基百科搜索",
description="用于搜索维基百科信息的工具,输入应该是搜索查询"
)
# 3. 计算工具
# 创建一个数学计算工具
@tool
def calculator(expression: str) -> str:
_"""支持多个表达式的数学计算器(使用 numexpr)"""_
_ _try:
local_dict = {"pi": math.pi, "e": math.e}
expression_1, expression_2 = expression.split("\n", maxsplit=1)
expressions = [expr.strip() for expr in expression_1.split(";") if expr.strip()]
results = []
for expr in expressions:
result = numexpr.evaluate(expr, global_dict={}, local_dict=local_dict)
results.append(f"{expr} = {result}")
return "\n".join(results)
except Exception as e:
return f"计算错误: {str(e)}"
# 4. 自定义知识工具
def knowledge_base(query):
_"""简单的知识库工具"""_
_ _knowledge = {
"小猫爱学工作时间": "每周日至周五下午一点至晚上十点半",
"小猫爱学休假类型": "员工依法享有法定假,事假不计薪,病假:正式员工每年享有5天带薪病假",
"小猫爱学工作纪律": "禁止穿奇装异服,不能私自离岗"
}
# 模糊匹配
query_lower = query.lower()
for key, value in knowledge.items():
if key.lower() in query_lower:
return value
return "抱歉,我的知识库中没有关于这个主题的信息。"
knowledge_tool = Tool(
name="Knowledge",
func=knowledge_base,
description="查询内部知识库,专门用于小猫爱学相关的问题。"
)
return [search_tool, wikipedia, calculator, knowledge_tool]
def _create_agent(self):
_"""创建ReAct代理"""_
_ _# 获取ReAct提示模板
# try:
# # prompt = hub.pull("hwchase17/react")
# except:
# 如果无法从hub获取,使用自定义模板
from langchain_core.prompts import PromptTemplate
template = """你是一个乐于助人的智能助手,在回答问题时应通过推理,并在需要时调用工具。
你可以使用以下工具:
{tools}
请遵循以下格式(注意英文冒号不能省略):
Question: 你需要回答的输入问题
Thought: 请一步步思考如何解答这个问题
Action: 你要执行的操作,必须是以下之一 [{tool_names}]
Action Input: 传入该操作的输入
Observation: 操作执行后的返回结果
...(Thought/Action/Action Input/Observation 的过程可以重复多次)
Thought: 我现在已经知道最终答案
Final Answer: 对最初问题的最终回答
即使你觉得自己知道答案,也请优先考虑使用工具获取答案。
现在开始!
Question: {input}
{agent_scratchpad}
"""
prompt = PromptTemplate.from_template(template)
# 创建ReAct代理
agent = create_react_agent(self.llm, self.tools, prompt)
# 创建代理执行器
agent_executor = AgentExecutor(
agent=agent,
tools=self.tools,
verbose=True,
handle_parsing_errors=True,
max_iterations=5, # 减少迭代次数
max_execution_time=45, # 45秒超时
early_stopping_method="generate", # 早停机制
return_intermediate_steps=True # 返回中间步骤便于调试
)
return agent_executor
def solve(self, question):
_"""使用ReAct框架解决问题"""_
_ _print("🤖 ReAct Framework - Reasoning and Acting")
print("=" * 60)
print(f"❓ 问题: {question}")
print("=" * 60)
try:
result = self.agent.invoke({"input": question})
return result["output"]
except Exception as e:
return f"执行过程中出现错误: {str(e)}"
# 使用示例
def main():
# 使用LangChain的ReAct实现
print("LangChain ReAct Agent")
print("=" * 80)
react_framework = ReActFramework()
# 测试问题
questions = [
# "埃隆·马斯克现在多大了?他是哪一年出生的?",
"计算 23 * 45 + 67 的结果",
# "小猫爱学休假类型?",
# "2024年世界杯在哪个国家举办?"
]
for i, question in enumerate(questions, 1):
print(f"\n测试 {i}:")
try:
result = react_framework.solve(question)
print(f"✅ 结果: {result}")
except Exception as e:
print(f"❌ 错误: {e}")
print("-" * 60)
if __name__ == "__main__":
main()
Multi-Agent System(MAS 多 Agent)
什么是多 Agent 系统
**多智能体系统(Multi-Agent System,简称 MAS)**是由多个具有智能和自主性的个体(即 Agent)组成的系统。这些 Agent 可以是软件程序、机器人、传感器等不同类型的实体,它们各自专注于特定领域的任务,并具备独立感知、决策与执行的能力。
MAS 的核心理念在于:通过各个 Agent 之间的协作与协调,共同完成复杂任务,进而实现单个 Agent 难以独立完成的目标。这种系统能够充分发挥分布式智能的优势,在动态环境中高效应对多变的需求与挑战。
对比单个 Agent,多 Agent 的优势
- 并行处理,提高效率
多个 Agent 可以同时工作,各自处理不同的子任务,从而显著提升系统整体效率。
示例:在自动驾驶系统中,一个 Agent 处理路径规划,另一个负责障碍物检测,还有一个监控车速,互不干扰又相互配合。
- 分布式架构,降低复杂度
任务被划分为多个模块,每个 Agent 只专注于自己擅长的部分,降低了系统设计和实现的复杂度。
类似“分工合作”,每个 Agent 只做擅长的事,像一个团队完成项目。
- 可扩展性强
新增或移除 Agent 不会影响整个系统的运行,便于灵活扩展和维护。
比如需要增加新功能时,只需加入一个新的 Agent 即可,无需修改原有结构。
- 容错性和鲁棒性好
当某个 Agent 出现故障时,系统可由其他 Agent 补位或重新协调任务,系统整体不至于瘫痪。
就像团队中某个人请假,其他人可以临时顶上,保证项目继续推进。
- 促进智能协作
多个智能体可以通过通信、协商和协同计划,共同解决单一 Agent 无法完成的问题。
比如在多语言文档处理任务中,一个 Agent 负责翻译,一个负责提取关键词,另一个负责问答。
多智能体系统(MAS)的主要应用场景与实际案例
以下是常见的多 Agent 架构:
AutoGen(Microsoft)
AutoGen 是由微软推出的一个多智能体(Multi-Agent)对话式协作框架,用于构建多个 LLM(大语言模型)之间的自动协作系统。
它的目标是让多个 Agent 以自然语言进行对话交互,在自动编程、推理、数据分析等复杂任务中互相配合,实现远超单一 LLM 的效果。
左图:智能体定制
这部分展示了 AutoGen 中 Agent 的可定制性:
图中的“Conversable Agent” 是 AutoGen 的核心类,表示一个“可对话智能体”。
每个 Agent 可以被定制为具备不同能力的个体,比如:
- 接入不同 LLM(如 OpenAI GPT)
- 加载用户自定义函数(Python 工具)
- 拥有特定身份(如用户、开发者、分析师)
核心:AutoGen 允许开发者根据实际需要定制每个 Agent 的模型、工具、身份与行为逻辑。
右上:多智能体对话
这一块展示了 AutoGen 支持的 多 Agent 对话协作能力:
- 两个不同类型的 Agent(蓝色和绿色)之间进行双向沟通。
- 每个 Agent 内部可以集成不同的模型、执行器、角色设定。
核心:AutoGen 支持多个 Agent 之间自然语言对话,不再局限于“用户 ↔ 单个 LLM”的模式。
右下:灵活的对话模式
-
Joint Chat(联合对话)
- 多个 Agent 处于同一层级,平等协作、互相通信。
- 常见于头脑风暴式、专家平等协作型任务。
-
Hierarchical Chat(层级式对话)
- 存在主控 Agent(例如蓝色),负责协调多个下属 Agent。
- 下属 Agent 各自专注于某一任务(如执行、生成、总结)
- 主 Agent 是用户代理,下级有代码生成、执行、调试、总结等角色。
创建 AutoGen 只需要简单的三步:
- 定义智能体(Agent)
创建并配置不同角色的智能体,比如用户代理(User Agent)、助手代理(Assistant Agent)、监督代理(Monitor Agent)等。每个智能体都对应一个或多个大语言模型实例,负责特定的任务或角色定位。 - 设计对话策略与任务流程
设定智能体之间的交互规则和对话流程,比如谁先说话、如何传递上下文、什么时候终止对话,以及如何协同完成具体任务。这一步确保多智能体协同有序且高效。 - 运行多智能体对话(执行协同任务)
启动智能体对话循环,自动驱动智能体之间进行多轮交流,最终完成目标任务或输出结果。这个过程由框架负责管理上下文和消息流。
** 案例名称:多智能体驱动的代码流程开发系统**
模拟一个开发团队,自动按照需求从代码编程-> 代码审查-> 代码测试的流程,每个模块都由一个智能体负责。
下载模块
pip install pyautogen==0.9.0
配置文件
import os
from dotenv import load_dotenv
load_dotenv()
# API配置
CONFIG_LIST = [
{
"model": "qwen-plus-2025-01-25",
"api_key": os.getenv("DASHSCOPE_API_KEY"), # 请替换为您的API密钥
"base_url": os.getenv("DASHSCOPE_BASE_URL"),
}
]
# LLM配置
LLM_CONFIG = {
"config_list": CONFIG_LIST,
"temperature": 0.7,
"timeout": 120,
}
# 智能体配置
AGENT_CONFIG = {
"user_proxy": {
"name": "UserProxy",
"system_message": """您是用户代表,负责:
1. 接收用户需求并转达给其他智能体
2. 对任务结果进行确认和反馈
3. 决定是否需要进一步优化
""",
"human_input_mode": "NEVER",
"max_consecutive_auto_reply": 5,
"code_execution_config": {
"work_dir": "coding_output",
"use_docker": False,
},
},
"assistant": {
"name": "CodingAssistant",
"system_message": """您是专业的编程助手,负责:
1. 理解和分析编程任务需求
2. 编写高质量的代码
3. 提供详细的代码说明和注释
4. 确保代码的可执行性和安全性
请始终提供完整、可运行的代码解决方案。
""",
},
"monitor": {
"name": "CodeReviewer",
"system_message": """您是代码审查专家,负责:
1. 检查代码质量和规范性
2. 识别潜在的bug和安全问题
3. 提出改进建议
4. 确保代码符合最佳实践
对每个代码方案都要进行严格审查,并给出明确的通过/修改建议。
""",
},
"tester": {
"name": "Tester",
"system_message": """您是测试工程师,负责:
1. 设计和编写测试用例
2. 验证代码功能的正确性
3. 测试边界条件和异常情况
4. 提供测试报告和建议
""",
}
}
# 对话配置
CHAT_CONFIG = {
"max_round": 2,
"allow_repeat_speaker": False,
"manager_system_message": """您是多智能体协作的管理者,负责:
1. 协调各个智能体的对话顺序
2. 确保任务按既定流程进行
3. 监控任务完成质量
4. 在适当时机终止对话
"""
}
# 任务示例配置
SAMPLE_TASKS = {
"task1": """
请创建一个Python函数,用于处理CSV文件:
1. 读取CSV文件
2. 计算数值列的统计信息(平均值、最大值、最小值)
3. 保存处理结果到新的CSV文件
4. 包含适当的错误处理
""",
"task2": """
创建一个简单的Flask Web应用:
1. 包含主页和关于页面
2. 使用Bootstrap美化界面
3. 实现一个简单的表单提交功能
4. 包含基本的输入验证
""",
"task3": """
使用matplotlib创建数据可视化脚本:
1. 生成模拟的销售数据
2. 创建折线图显示月度趋势
3. 创建柱状图显示产品类别对比
4. 保存图表为PNG文件
"""
}
# 终止条件配置
TERMINATION_CONFIG = {
"keywords": [
"测试通过", "task completed", "任务完成", "successfully tested",
"task completed successfully", "任务圆满完成", "all tests passed",
"代码审查通过且测试完成", "final output ready"
],
"min_messages": 3
}
核心代码
import autogen
from config import (
LLM_CONFIG, AGENT_CONFIG, CHAT_CONFIG, SAMPLE_TASKS,
TERMINATION_CONFIG
)
# ========================= 第一步:定义智能体(Agent) =========================
def create_agents():
_"""创建所有智能体"""_
_ _# 1.1 创建用户代理(User Agent)
user_proxy = autogen.UserProxyAgent(
name=AGENT_CONFIG["user_proxy"]["name"],
system_message=AGENT_CONFIG["user_proxy"]["system_message"],
human_input_mode=AGENT_CONFIG["user_proxy"]["human_input_mode"],
max_consecutive_auto_reply=AGENT_CONFIG["user_proxy"]["max_consecutive_auto_reply"],
code_execution_config=AGENT_CONFIG["user_proxy"]["code_execution_config"],
)
# 1.2 创建助手代理(Assistant Agent)
assistant_agent = autogen.AssistantAgent(
name=AGENT_CONFIG["assistant"]["name"],
system_message=AGENT_CONFIG["assistant"]["system_message"],
llm_config=LLM_CONFIG,
)
# 1.3 创建监督代理(Monitor Agent)
monitor_agent = autogen.AssistantAgent(
name=AGENT_CONFIG["monitor"]["name"],
system_message=AGENT_CONFIG["monitor"]["system_message"],
llm_config=LLM_CONFIG,
)
# 1.4 创建测试代理(Tester Agent)
tester_agent = autogen.AssistantAgent(
name=AGENT_CONFIG["tester"]["name"],
system_message=AGENT_CONFIG["tester"]["system_message"],
llm_config=LLM_CONFIG,
)
return user_proxy, assistant_agent, monitor_agent, tester_agent
# ========================= 第二步:设计对话策略与任务流程 =========================
class MultiAgentWorkflow:
def __init__(self, user_proxy, assistant_agent, monitor_agent, tester_agent):
self.user_proxy = user_proxy
self.assistant_agent = assistant_agent
self.monitor_agent = monitor_agent
self.tester_agent = tester_agent
def create_group_chat(self):
_"""创建群组对话,定义智能体交互规则"""_
_ _# 定义智能体参与列表
agents = [
self.user_proxy,
self.assistant_agent,
self.monitor_agent,
self.tester_agent
]
# 设置对话流程规则
def custom_speaker_selection(last_speaker, group_chat):
_"""自定义发言顺序逻辑"""_
_ _# 获取消息列表
messages = group_chat.messages
if len(messages) == 0:
return self.user_proxy
# 获取最后一个聊天消息
last_message = messages[-1]
# 如果是用户代理发起任务,下一个是编程助手
if last_speaker == self.user_proxy:
return self.assistant_agent
# 如果编程助手完成代码,交给监督代理审查
elif last_speaker == self.assistant_agent:
if "```python" in last_message.get("content", ""):
return self.monitor_agent
else:
return self.assistant_agent
# 如果监督代理完成审查,交给测试代理
elif last_speaker == self.monitor_agent:
if "通过" in last_message.get("content", "") or "APPROVED" in last_message.get("content", ""):
return self.tester_agent
else:
return self.assistant_agent # 需要修改则回到编程助手
# 测试代理完成后,可以结束或继续优化
elif last_speaker == self.tester_agent:
group_chat.messages = []
return self.user_proxy
return None
def define_termination_condition(messages):
_"""定义任务终止条件"""_
_ _print("🔍 终止函数被调用!")
def is_task_complete():
_"""检查任务是否完成"""_
_ _if len(messages) < TERMINATION_CONFIG["min_messages"]:
return False
print(messages)
# 检查是否包含测试通过的标志
content = messages.get("content", "").lower()
if any(keyword in content for keyword in TERMINATION_CONFIG["keywords"]):
return True
return False
return is_task_complete()
# 创建群组对话
group_chat = autogen.GroupChat(
agents=agents,
messages=[],
max_round=CHAT_CONFIG["max_round"], # 最大的发言回合数量
speaker_selection_method=custom_speaker_selection, # 自定义发言规则
allow_repeat_speaker=CHAT_CONFIG["allow_repeat_speaker"], # 是否运行同一个人连续发言
)
# 创建群组对话管理器
manager = autogen.GroupChatManager(
groupchat=group_chat,
llm_config=LLM_CONFIG,
system_message=CHAT_CONFIG["manager_system_message"],
is_termination_msg=define_termination_condition
)
return manager
# ========================= 第三步:运行多智能体对话(执行协同任务) =========================
def run_multi_agent_task(task_description: str):
_"""启动多智能体协同任务"""_
_ _print("🚀 启动多智能体协同任务...")
print(f"📋 任务描述: {task_description}")
print("-" * 50)
# 创建智能体
user_proxy, assistant_agent, monitor_agent, tester_agent = create_agents()
# 创建工作流实例
workflow = MultiAgentWorkflow(user_proxy, assistant_agent, monitor_agent, tester_agent)
# 创建群组对话管理器
manager = workflow.create_group_chat()
try:
# 启动对话循环
result = user_proxy.initiate_chat(
manager,
message=f"""
新任务请求:{task_description}
请按照以下流程协同完成:
1. 编程助手:分析需求并编写代码
2. 代码审查员:审查代码质量和安全性
3. 测试工程师:编写测试用例并验证功能
4. 当测试工程师完成之后,请明确给出完成的提示
请开始执行任务。
""",
)
print("\n" + "=" * 50)
print("✅ 多智能体协同任务执行完成!")
print("=" * 50)
return result
except Exception as e:
print(f"❌ 任务执行过程中出现错误: {str(e)}")
return None
# ========================= 示例使用 =========================
if __name__ == "__main__":
# 选择要执行的任务(可以修改这里来测试不同任务)
selected_task = SAMPLE_TASKS["task1"] # 可以改为 task2, task3
print("🤖 AutoGen多智能体协同系统启动")
print("=" * 60)
# 执行任务
result = run_multi_agent_task(selected_task)
if result:
print(f"\n📊 任务执行摘要:")
print(f"- 总对话轮数: {len(result.chat_history) if hasattr(result, 'chat_history') else '未知'}")
print(f"- 任务状态: 已完成")
print(f"- 输出文件位置: ./coding_output/ 目录")
else:
print("\n❌ 任务执行失败,请检查配置和网络连接")
CrewAI
CrewAI 是一个开源的、基于 Python 的多智能体(multi-agent)协作框架,由 João Moura 创立,旨在让多个角色各异的 AI 智能体协同完成复杂任务。它通过明确的 Agents(代理)→ Tasks(任务)→ Crews(团队)→ Flows(流程) 结构,高效构建自动化流程。
核心理念:分工与协作
人类团队的强大之处在于专业分工:市场分析师负责收集数据,文案撰稿人负责撰写报告,编辑负责审校和润色。
CrewAI 将这个理念应用到了 AI 上。它认为,通过让多个“专家型”AI Agent 协同工作,每个 Agent 只专注于自己擅长的领域,可以比单个“全能型”AI Agent 更高效、更可靠地完成复杂任务。
核心组件
- Agents(代理)- 团队成员
每个智能体都有角色、目标和背景故事,可调用工具处理信息、决策和任务执行。
- Tasks(任务)- 具体工作
将复杂目标分解为代理执行的小单元任务,明确每个任务的目标和期望输出。
- Tool (工具) - 成员的技能
这些是 Agent 可以使用的函数或能力,与 LangChain 和 LlamaIndex 中的工具概念一致。CrewAI 可以无缝集成 LangChain 的工具。
- Crews(团队)- 团队本身
聚合多个代理与任务,并支持可选的执行 Process(流程),可串行、并行或响应事件驱动,处理复杂任务编排。
- Flows(流程)- 工作方式
用于频繁或高级生产场景,提供精细控制、状态管理、分支逻辑和外部集成能力,特别适合复杂业务逻辑。
案例名称:多智能体驱动的技术情报分析与内容发布系统
模拟一个技术媒体编辑部,自动完成从情报搜集 → 数据分析 → 文章撰写 → 审校 → 发布的完整流程。每个阶段由独立的智能体负责,使用多个任务进行协作,适合复杂多步骤流程建模。
下载模块
# 创建一个conda环境,**CrewAI**的依赖包很多
conda create -n crewai_env python=3.12
activate crewai_env
# 下载对应的包
pip install crewai==0.22.3 dotenv langchain==0.1.20 langchain-openai==0.0.5 langchain-core==0.1.52 langchain-community==0.0.38
import os
from datetime import datetime
from crewai import Agent, Task, Crew, Process
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
# 加载配置
load_dotenv()
# 配置千问模型
def setup_qwen_model(api_key: str):
_"""设置千问模型"""_
_ _return ChatOpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
model="qwen-plus-2025-01-25",
temperature=0.7
)
class TechMediaCrew:
_"""简化版技术媒体编辑部"""_
_ _def __init__(self, qwen_api_key: str):
self.llm = setup_qwen_model(qwen_api_key)
self.search_tool = TavilySearchResults()
self.setup_agents()
def setup_agents(self):
_"""设置智能体团队"""_
_ _# 情报员 - 负责搜索和收集信息
self.researcher = Agent(
role='技术情报员',
goal='搜索并收集最新的技术资讯',
backstory='你是专业的技术记者,擅长找到最新最有价值的科技新闻',
tools=[self.search_tool],
llm=self.llm,
verbose=True
)
# 分析师 - 负责分析和总结
self.analyst = Agent(
role='数据分析师',
goal='分析技术趋势,提取关键洞察',
backstory='你是经验丰富的数据分析师,能从信息中发现重要趋势',
llm=self.llm,
verbose=True
)
# 作者 - 负责写文章
self.writer = Agent(
role='科技作者',
goal='写出高质量的技术文章',
backstory='你是资深科技记者,文笔优秀,能把复杂技术写得通俗易懂',
llm=self.llm,
verbose=True
)
# 编辑 - 负责审校
self.editor = Agent(
role='主编',
goal='审校文章,确保质量',
backstory='你是严谨的主编,会仔细检查文章质量和准确性',
llm=self.llm,
verbose=True
)
def create_tasks(self, topic: str):
_"""创建工作任务"""_
_ _# 任务1: 搜索资料
research_task = Task(
description=f'''搜索关于"{topic}"的最新信息,包括:
1. 最新技术发展
2. 行业动态
3. 重要公司动向
4. 市场趋势
请搜索3-5个不同的信息源。''',
agent=self.researcher,
expected_output="搜索到的原始信息和资料"
)
# 任务2: 分析总结
analysis_task = Task(
description=f'''分析搜索到的"{topic}"相关信息,提供:
1. 关键趋势分析
2. 重要事件总结
3. 技术影响评估
4. 未来发展预测''',
agent=self.analyst,
expected_output="详细的分析报告和关键洞察",
context=[research_task]
)
# 任务3: 写文章
writing_task = Task(
description=f'''基于分析结果,写一篇关于"{topic}"的文章:
1. 标题要吸引人
2. 内容要有逻辑性
3. 语言要通俗易懂
4. 长度800-1200字
5. 包含数据和例子''',
agent=self.writer,
expected_output="完整的技术文章",
context=[research_task, analysis_task]
)
# 任务4: 编辑审校
editing_task = Task(
description='''审校文章,检查:
1. 事实准确性
2. 逻辑清晰度
3. 语言流畅性
4. 结构合理性
5. 标题和内容匹配度
如有问题请修改完善。''',
agent=self.editor,
expected_output="最终审校后的高质量文章",
context=[writing_task]
)
return [research_task, analysis_task, writing_task, editing_task]
def run_process(self, topic: str):
_"""运行完整流程"""_
_ _print(f"🚀 开始处理主题: {topic}")
print("-" * 50)
# 创建任务
tasks = self.create_tasks(topic)
# 组建团队
crew = Crew(
agents=[self.researcher, self.analyst, self.writer, self.editor],
tasks=tasks,
process=Process.sequential,
verbose=2
)
# 执行任务
start_time = datetime.now()
try:
result = crew.kickoff()
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print(f"\n✅ 任务完成!用时: {duration:.1f}秒")
print("-" * 50)
return result
except Exception as e:
print(f"❌ 任务失败: {e}")
return None
def main():
_"""主函数"""_
_ _# 设置API密钥 (需要替换为实际密钥)
QWEN_API_KEY = "your-qwen-api-key"
# 创建编辑部
media_crew = TechMediaCrew(qwen_api_key=QWEN_API_KEY)
# 运行示例
topics = ["ChatGPT最新发展", "苹果Vision Pro", "特斯拉自动驾驶"]
for topic in topics[:1]: # 只运行一个示例
print(f"📰 技术媒体编辑部 - {topic}")
print("=" * 60)
result = media_crew.run_process(topic)
if result:
print(f"\n📄 最终文章:\n{result}")
if __name__ == "__main__":
main()