**你是否曾想过构建一个能够直接从数据库中回答特定问题的聊天机器人?**我们曾在某个项目中遇到过这样的需求。
最初,我们使用标准的 LangChain,并调用自定义工具——为每个数据库表创建单独的函数,然后手动编写 SQL 查询。结果如何?延迟高,代码库难以维护,并且随着数据库规模呈指数级增长,内存管理也一团糟。
一、我们面临的问题:
- 随着新表的添加,代码库规模迅速扩大。
- 手动生成 SQL 语句会降低运行速度并引入错误。
- 长时间对话导致内存管理出现问题
- 支持新查询需要大量的开发工作。
- 在聊天机器人的基础上添加新功能变得困难。
- 实现上下文管理(机器人记忆)既繁琐又容易出错。
后来我们发现了 LangChain 的 SQL 代理,以及 PostgreSQL 和 LangChain 如何轻松满足我们的大部分需求。在本教程中,我们将向您展示如何构建该解决方案,该方案在显著提升性能的同时,将代码库复杂度降低了 70%。
二、我们将建造什么
一个使用 LangChain、OpenAI 和 FastAPI 的完整对话式 SQL 代理,可以处理来自图书数据库的查询,例如:
- “给我看看斯蒂芬·金的所有作品”
- 哪些作者写过三本以上的书?
- 科幻小说的平均评分是多少?
该系统能够保持对话上下文,因此后续问题可以自然地进行——而且完全没有我们以前方法中存在的维护难题。
三、先决条件
- Python 3.9+
- PostgreSQL数据库
- OpenAI API密钥
- 具备 SQL 和 Python 的基础知识
四、设置
首先,安装所需的软件包:
pip install fastapi uvicorn langchain-openai langchain-community sqlalchemy psycopg2-binary langchain-postgres举例来说,假设我们有一个简单的**书店数据库,**其中包含两个表:
-- Authors table CREATE TABLE authors ( id SERIAL PRIMARY KEY, name VARCHAR(255) NOT NULL, birth_year INTEGER, nationality VARCHAR(100) ); -- Books table CREATE TABLE books ( id SERIAL PRIMARY KEY, title VARCHAR(255) NOT NULL, author_id INTEGER REFERENCES authors(id), genre VARCHAR(100), publication_year INTEGER, rating DECIMAL(3,2) );作者:存储作者的基本信息,例如姓名、出生年份和国籍。- 书籍:存储书籍详细信息,例如标题、类型、出版年份和评分,并带有指向作者的外键链接。
创建这些表并插入一些示例数据后,最好定义一个名为<view>的视图books_with_authors。该视图将两个表连接成一个统一的数据集,从而简化查询。这样,代理无需每次都编写复杂的 SQL 连接,可以直接查询该视图来获取书籍及其作者信息。
CREATE VIEW books_with_authors AS SELECT b.id AS book_id, b.title, b.genre, b.publication_year, b.rating, a.name AS author_name, a.birth_year, a.nationality FROM books b JOIN authors a ON b.author_id = a.id;步骤 1:基本 SQL 代理
现在数据库已准备就绪,我们将设置一个由 LangChain 和 OpenAI 提供支持的SQL 代理。如果没有openapikey,可以本地搭建qwen或者deepseek提供服务,可以参考这篇文章
[不用云端!Ollama 帮你在笔记本部署Qwen3,保姆级教程]
mport os from langchain_openai import ChatOpenAI from langchain_community.utilities import SQLDatabase from langchain_community.agent_toolkits import create_sql_agent, SQLDatabaseToolkit from sqlalchemy import create_engine # Setup os.environ["OPENAI_API_KEY"] = "your-openai-api-key" DB_URI = "postgresql+psycopg2://username:password@localhost:5432/bookstore" # Create database connection engine = create_engine(DB_URI) # Define custom table info for better LLM context custom_table_info = { "authors": ( "A table of authors.\n" "- id (SERIAL PRIMARY KEY): Unique ID of author\n" "- name (VARCHAR): Name of the author\n" "- birth_year (INTEGER): Year of birth\n" "- nationality (VARCHAR): Nationality of the author\n" ), "books": ( "A table of books.\n" "- id (SERIAL PRIMARY KEY): Unique ID of book\n" "- title (VARCHAR): Title of the book\n" "- author_id (INTEGER): References authors(id)\n" "- genre (VARCHAR): Genre of the book\n" "- publication_year (INTEGER): Year of publication\n" "- rating (DECIMAL): Book rating (0–5)\n" ), "books_with_authors": ( "A view combining books and authors.\n" "- book_id (INTEGER): ID of the book\n" "- title (VARCHAR): Title of the book\n" "- genre (VARCHAR): Genre of the book\n" "- publication_year (INTEGER): Year of publication\n" "- rating (DECIMAL): Rating of the book\n" "- author_name (VARCHAR): Name of the author\n" "- birth_year (INTEGER): Birth year of the author\n" "- nationality (VARCHAR): Nationality of the author\n" ), } # Initialize SQLDatabase with view support and custom info db = SQLDatabase( engine=engine, include_tables=list(custom_table_info.keys()), custom_table_info=custom_table_info, view_support=True ) # Initialize LLM llm = ChatOpenAI(model="gpt-4", temperature=0) # Create toolkit and agent toolkit = SQLDatabaseToolkit(db=db, llm=llm) agent = create_sql_agent( toolkit=toolkit, llm=llm, agent_type="tool-calling", verbose=True ) # Test it out response = agent.invoke({"input": "List all books with their authors and ratings"}) print(response["output"])以下是其内部工作原理:
- 自然语言到 SQL— 代理程序会处理类似*“列出所有书籍及其作者和评分”这样的简单英语问题。*
- SQL 生成— 它会自动生成相应的 SQL 查询(例如,从
books_with_authors视图中进行选择)。 - 执行——该查询针对 PostgreSQL 数据库运行。
- 可读输出— 然后,代理以人类易于理解的格式返回结果。
我们还通过提供自定义表信息来增强智能体的推理能力。这些元数据以自然语言描述每个表及其视图,使模型无需重复检查模式即可了解字段的含义。例如,我们会指定某个值rating是 0 到 5 之间的十进制数,或者指定某个值author_id引用了某个authors表。
启用视图支持后,我们告诉代理,视图(例如<view>books_with_authors``)应被视为一等公民。这样,代理就可以优先查询视图,而不是每次都重新创建连接逻辑,从而使查询更简洁、更可靠。
控制台输出:
agent的回复
步骤 2:添加原始结果回调
有时您需要访问原始 SQL 查询结果以进行进一步处理。让我们添加一个回调处理程序来捕获此信息:
rom langchain.callbacks.base import BaseCallbackHandler class SQLResultHandler(BaseCallbackHandler): """Callback handler to capture raw SQL query results""" def __init__(self): self.latest_sql_result = None self.sql_run_ids = set() def on_tool_start(self, serialized, input_str, **kwargs): """Track SQL tool starts""" tool_name = serialized.get('name', 'unknown') if isinstance(serialized, dict) else str(serialized) if tool_name == "sql_db_query": run_id = kwargs.get('run_id') self.sql_run_ids.add(run_id) def on_tool_end(self, output, **kwargs): """Capture SQL tool output""" run_id = kwargs.get('run_id') parent_run_id = kwargs.get('parent_run_id') # Check if this is a SQL tool end if run_id in self.sql_run_ids or parent_run_id in self.sql_run_ids: self.latest_sql_result = output # Clean up run IDs self.sql_run_ids.discard(run_id) self.sql_run_ids.discard(parent_run_id) def on_tool_error(self, error, **kwargs): """Clean up on SQL tool errors""" run_id = kwargs.get('run_id') self.sql_run_ids.discard(run_id) def get_latest_result(self): """Get the most recent SQL result""" return self.latest_sql_result def reset(self): """Reset for next query""" self.latest_sql_result = None self.sql_run_ids = set() # Usage with callback sql_handler = SQLResultHandler() response = agent.invoke( {"input": "Show me all science fiction books"}, {"callbacks": [sql_handler]} ) print("Agent Response:", response["output"]) print("Raw SQL Result:", sql_handler.get_latest_result())LangChain 的回调系统允许您接入代理执行的不同阶段。我们的方案SQLResultHandler专门用于捕获 SQL 数据库查询的输出,从而使我们能够同时访问代理的自然语言响应和原始数据。
步骤 3:添加对话记忆
现在让我们添加内存,以便我们的代理可以处理后续问题并保持上下文:
from langchain_postgres import PostgresChatMessageHistory from langchain.memory import ConversationBufferMemory import psycopg # Connection for chat history (separate from main DB) CHAT_HISTORY_DB = "postgresql://username:password@localhost:5432/bookstore" CHAT_HISTORY_TABLE = "chat_history" #the table that will store our hisory CHAT_HISTORY_CONN = psycopg.connect(CHAT_HISTORY_DB)我们还需要告诉 LangChain 创建用于存储聊天记录的表。这只需要执行一次,之后您可以注释掉或删除这段代码。
# Run this only once try: PostgresChatMessageHistory.create_tables(CHAT_HISTORY_CONN, CHAT_HISTORY_TABLE) print(f"Chat history table '{CHAT_HISTORY_TABLE}' created or already exists") except Exception as e: print(f"Note: {e}")之后,我们会获取对话历史记录并将其转换为客服人员易于理解的格式。这PostgresChatMessageHistory需要session_id将数据转换为 UUID 字符串。
async def get_session_history(session_id: str) -> PostgresChatMessageHistory: """Get chat history for a session""" async_conn = await psycopg.AsyncConnection.connect(CHAT_HISTORY_CONN) return PostgresChatMessageHistory( CHAT_HISTORY_TABLE, session_id, async_connection=async_conn ) async def get_memory(session_id: str) -> ConversationBufferMemory: """Create memory with PostgreSQL backing""" chat_history = await get_session_history(session_id) return ConversationBufferMemory( chat_memory=chat_history, memory_key="history", return_messages=True ) async def format_history(chat_history, max_messages: int = 6): """Format recent chat history for context""" messages = await chat_history.aget_messages() recent_messages = messages[-max_messages:] if len(messages) > max_messages else messages formatted = [] for msg in recent_messages: role = "User" if msg.type == "human" else "Assistant" formatted.append(f"{role}: {msg.content}") return "\n".join(formatted) async def create_agent_with_memory(session_id: str): """Create agent with conversation memory""" memory = await get_memory(session_id) # Get formatted history for context readable_history = await format_history(memory.chat_memory, 6) # Custom prompt with history custom_prefix = f""" You are a helpful assistant that can answer questions about a bookstore database. You have access to information about books and authors. Previous conversation context: {readable_history} Be concise and helpful in your responses. """ return create_sql_agent( toolkit=toolkit, llm=llm, agent_type="tool-calling", prefix=custom_prefix, agent_executor_kwargs={"memory": memory}, verbose=True ) # Usage with memory import asyncio async def chat_example(): agent = await create_agent_with_memory("3dc035ae-bc72-4d5a-8569-c87c10aab97f") # Must be a UUID # First question response1 = await agent.ainvoke({"input": "How many books by Jane Austen do we have?"}) print("Response 1:", response1["output"]) # Follow-up question (will remember context) response2 = await agent.ainvoke({"input": "What genres are they?"}) print("Response 2:", response2["output"]) # Run the example asyncio.run(chat_example())内存系统将对话历史记录存储在 PostgreSQL 数据库中,使代理能够:
- 记住之前的问题和答案
- 自然地应对后续问题
- 在多次交互中保持上下文关联
- 使用会话 ID 扩展到多个用户/会话
步骤 4:FastAPI Web 服务
最后,为了方便部署,我们将所有内容封装到一个 FastAPI 应用程序中:
from fastapi import FastAPI from pydantic import BaseModel from typing import Optional app = FastAPI(title="SQL Chat Agent", version="1.0.0") class ChatRequest(BaseModel): message: str user_id: str class ChatResponse(BaseModel): reply: str raw_sql_result: Optional[str] = None @app.post("/chat", response_model=ChatResponse) async def chat_endpoint(request: ChatRequest): """Chat with the SQL agent""" # Create handler for raw results sql_handler = SQLResultHandler() # Create agent with memory for this user agent = await create_agent_with_memory(request.user_id) # Process the question response = await agent.ainvoke( {"input": request.message}, {"callbacks": [sql_handler]} ) return ChatResponse( reply=response["output"], raw_sql_result=sql_handler.get_latest_result() ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)五、运行应用程序
保存代码main.py并运行:
uvicorn main:app --reload您的 API 将可通过以下方式访问http://localhost:8000:。您可以使用以下命令进行测试:
curl -X POST "http://localhost:8000/chat" \ -H "Content-Type: application/json" \ -d '{ "message": "How many authors do we have?",, "user_id": "3dc035ae-bc72-4d5a-8569-c87c10aab97f" }'**注意:**该
user_id字段必须始终是有效的 UUID,否则 LangChain 将抛出错误。
六、测试您的代理
请尝试以下示例查询:
- “请显示2020年以后出版的所有书籍”
- “哪位作者的图书平均评分最高?”
- “列出评分高于 4.0 的科幻小说”
- “《闪灵》的作者是谁?”(接着问:“他们还写过哪些其他书籍?”)
七、完整工作示例
以下是整合所有功能的完整代码:
import os import asyncio from typing import Optional from fastapi import FastAPI from pydantic import BaseModel from sqlalchemy import create_engine from langchain_openai import ChatOpenAI from langchain_community.utilities import SQLDatabase from langchain_community.agent_toolkits import create_sql_agent, SQLDatabaseToolkit from langchain_postgres import PostgresChatMessageHistory from langchain.memory import ConversationBufferMemory from langchain.callbacks.base import BaseCallbackHandler import psycopg # Configuration os.environ["OPENAI_API_KEY"] = "your-openai-api-key" DB_URI = "postgresql+psycopg2://username:password@localhost:5432/bookstore" CHAT_HISTORY_CONN = "postgresql://username:password@localhost:5432/bookstore" CHAT_HISTORY_TABLE = "chat_history" # Database setup engine = create_engine(DB_URI) # Define custom table info for better LLM context custom_table_info = { "authors": ( "A table of authors.\n" "- id (SERIAL PRIMARY KEY): Unique ID of author\n" "- name (VARCHAR): Name of the author\n" "- birth_year (INTEGER): Year of birth\n" "- nationality (VARCHAR): Nationality of the author\n" ), "books": ( "A table of books.\n" "- id (SERIAL PRIMARY KEY): Unique ID of book\n" "- title (VARCHAR): Title of the book\n" "- author_id (INTEGER): References authors(id)\n" "- genre (VARCHAR): Genre of the book\n" "- publication_year (INTEGER): Year of publication\n" "- rating (DECIMAL): Book rating (0–10)\n" ), "books_with_authors": ( "A view combining books and authors.\n" "- book_id (INTEGER): ID of the book\n" "- title (VARCHAR): Title of the book\n" "- genre (VARCHAR): Genre of the book\n" "- publication_year (INTEGER): Year of publication\n" "- rating (DECIMAL): Rating of the book\n" "- author_name (VARCHAR): Name of the author\n" "- birth_year (INTEGER): Birth year of the author\n" "- nationality (VARCHAR): Nationality of the author\n" ), } # Initialize SQLDatabase with view support and custom info db = SQLDatabase( engine=engine, include_tables=list(custom_table_info.keys()), custom_table_info=custom_table_info, view_support=True ) llm = ChatOpenAI(model="gpt-4", temperature=0) toolkit = SQLDatabaseToolkit(db=db, llm=llm) # Basic Callback Handler class SQLResultHandler(BaseCallbackHandler): def __init__(self): self.latest_sql_result = None self.sql_run_ids = set() def on_tool_start(self, serialized, input_str, **kwargs): tool_name = serialized.get('name', 'unknown') if isinstance(serialized, dict) else str(serialized) if tool_name == "sql_db_query": self.sql_run_ids.add(kwargs.get('run_id')) def on_tool_end(self, output, **kwargs): run_id = kwargs.get('run_id') if run_id in self.sql_run_ids: self.latest_sql_result = output self.sql_run_ids.discard(run_id) def get_latest_result(self): return self.latest_sql_result # Memory Handling async def get_session_history(session_id: str): async_conn = await psycopg.AsyncConnection.connect(CHAT_HISTORY_CONN) return PostgresChatMessageHistory(CHAT_HISTORY_TABLE, session_id, async_connection=async_conn) async def get_memory(session_id: str): chat_history = await get_session_history(session_id) return ConversationBufferMemory(chat_memory=chat_history, memory_key="history", return_messages=True) # Agent Creation async def create_agent_with_memory(session_id: str): memory = await get_memory(session_id) return create_sql_agent( toolkit=toolkit, llm=llm, agent_type="tool-calling", agent_executor_kwargs={"memory": memory}, verbose=True ) # FastAPI app app = FastAPI(title="SQL Chat Agent") # Models class ChatRequest(BaseModel): message: str user_id: str class ChatResponse(BaseModel): reply: str raw_sql_result: Optional[str] = None # API Endpoint @app.post("/chat", response_model=ChatResponse) async def chat_endpoint(request: ChatRequest): sql_handler = SQLResultHandler() agent = await create_agent_with_memory(request.user_id) response = await agent.ainvoke( {"input": request.message}, {"callbacks": [sql_handler]} ) return ChatResponse( reply=response["output"], raw_sql_result=sql_handler.get_latest_result() ) # Execution if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)请求示例:
{ "message": "List all books with their authors and ratings", "user_id": "44b11b50-9417-4fa5-8e5d-ea968c6dc7d1" }agent回复:
{ "reply": "Here are some books with their authors and ratings:\n\n1. 'One Hundred Years of Solitude' by Gabriel García Márquez - Rating: 4.90\n2. '1984' by George Orwell - Rating: 4.80\n3. 'Pride and Prejudice' by Jane Austen - Rating: 4.70\n4. 'Animal Farm' by George Orwell - Rating: 4.60\n5. 'Half of a Yellow Sun' by Chimamanda Ngozi Adichie - Rating: 4.60\n6. 'Kafka on the Shore' by Haruki Murakami - Rating: 4.50\n7. 'Emma' by Jane Austen - Rating: 4.50\n8. 'Americanah' by Chimamanda Ngozi Adichie - Rating: 4.40\n9. 'Adventures of Huckleberry Finn' by Mark Twain - Rating: 4.40\n10. 'Norwegian Wood' by Haruki Murakami - Rating: 4.30", "raw_sql_result": "[('One Hundred Years of Solitude', 'Gabriel García Márquez', Decimal('4.90')), ('1984', 'George Orwell', Decimal('4.80')), ('Pride and Prejudice', 'Jane Austen', Decimal('4.70')), ('Animal Farm', 'George Orwell', Decimal('4.60')), ('Half of a Yellow Sun', 'Chimamanda Ngozi Adichie', Decimal('4.60')), ('Kafka on the Shore', 'Haruki Murakami', Decimal('4.50')), ('Emma', 'Jane Austen', Decimal('4.50')), ('Americanah', 'Chimamanda Ngozi Adichie', Decimal('4.40')), ('Adventures of Huckleberry Finn', 'Mark Twain', Decimal('4.40')), ('Norwegian Wood', 'Haruki Murakami', Decimal('4.30'))]" }八、主要优势
这种方法可以让你:
- 自然语言界面:用户可以用简单的英语提问
- 对话记忆:在多个问题中保持上下文关联
- 原始数据访问:回调提供对底层 SQL 结果的访问,或者在代理呼叫后您想要执行的任何其他操作。
- REST API:轻松与 Web 应用、移动应用或其他服务集成
- 可扩展:支持多个并发用户,并提供会话管理功能。
您的 API 将可供测试http://localhost:8000!
就、结论
现在,您拥有了一个功能齐全的对话式 SQL 代理,它可以通过自然语言处理复杂的数据库查询。模块化设计使其易于扩展,例如:
- 速率限制和身份验证
- 查询结果缓存
- 支持多种数据库
- 自定义响应格式
LangChain 的 SQL 代理功能与 FastAPI 的现代 Web 框架相结合,为构建智能数据库接口奠定了强大的基础。
如何学习大模型 AI ?
由于新岗位的生产效率,要优于被取代岗位的生产效率,所以实际上整个社会的生产效率是提升的。
但是具体到个人,只能说是:
“最先掌握AI的人,将会比较晚掌握AI的人有竞争优势”。
这句话,放在计算机、互联网、移动互联网的开局时期,都是一样的道理。
我在一线互联网企业工作十余年里,指导过不少同行后辈。帮助很多人得到了学习和成长。
我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在人工智能学习中的很多困惑,所以在工作繁忙的情况下还是坚持各种整理和分享。但苦于知识传播途径有限,很多互联网行业朋友无法获得正确的资料得到学习提升,故此将并将重要的AI大模型资料包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。
第一阶段(10天):初阶应用
该阶段让大家对大模型 AI有一个最前沿的认识,对大模型 AI 的理解超过 95% 的人,可以在相关讨论时发表高级、不跟风、又接地气的见解,别人只会和 AI 聊天,而你能调教 AI,并能用代码将大模型和业务衔接。
- 大模型 AI 能干什么?
- 大模型是怎样获得「智能」的?
- 用好 AI 的核心心法
- 大模型应用业务架构
- 大模型应用技术架构
- 代码示例:向 GPT-3.5 灌入新知识
- 提示工程的意义和核心思想
- Prompt 典型构成
- 指令调优方法论
- 思维链和思维树
- Prompt 攻击和防范
- …
第二阶段(30天):高阶应用
该阶段我们正式进入大模型 AI 进阶实战学习,学会构造私有知识库,扩展 AI 的能力。快速开发一个完整的基于 agent 对话机器人。掌握功能最强的大模型开发框架,抓住最新的技术进展,适合 Python 和 JavaScript 程序员。
- 为什么要做 RAG
- 搭建一个简单的 ChatPDF
- 检索的基础概念
- 什么是向量表示(Embeddings)
- 向量数据库与向量检索
- 基于向量检索的 RAG
- 搭建 RAG 系统的扩展知识
- 混合检索与 RAG-Fusion 简介
- 向量模型本地部署
- …
第三阶段(30天):模型训练
恭喜你,如果学到这里,你基本可以找到一份大模型 AI相关的工作,自己也能训练 GPT 了!通过微调,训练自己的垂直大模型,能独立训练开源多模态大模型,掌握更多技术方案。
到此为止,大概2个月的时间。你已经成为了一名“AI小子”。那么你还想往下探索吗?
- 为什么要做 RAG
- 什么是模型
- 什么是模型训练
- 求解器 & 损失函数简介
- 小实验2:手写一个简单的神经网络并训练它
- 什么是训练/预训练/微调/轻量化微调
- Transformer结构简介
- 轻量化微调
- 实验数据集的构建
- …
第四阶段(20天):商业闭环
对全球大模型从性能、吞吐量、成本等方面有一定的认知,可以在云端和本地等多种环境下部署大模型,找到适合自己的项目/创业方向,做一名被 AI 武装的产品经理。
- 硬件选型
- 带你了解全球大模型
- 使用国产大模型服务
- 搭建 OpenAI 代理
- 热身:基于阿里云 PAI 部署 Stable Diffusion
- 在本地计算机运行大模型
- 大模型的私有化部署
- 基于 vLLM 部署大模型
- 案例:如何优雅地在阿里云私有部署开源大模型
- 部署一套开源 LLM 项目
- 内容安全
- 互联网信息服务算法备案
- …
学习是一个过程,只要学习就会有挑战。天道酬勤,你越努力,就会成为越优秀的自己。
如果你能在15天内完成所有的任务,那你堪称天才。然而,如果你能完成 60-70% 的内容,你就已经开始具备成为一名大模型 AI 的正确特征了。