Modular RAG
在完成 Naive RAG 的基础构建与 Advanced RAG 的链路优化后,我们正式进入Modular RAG (模块化 RAG) 章节。
不同于以往线性的、固定的 Pipeline,Modular RAG 引入智能调度、持久化 存储与智能增量加载机制的概念。系统根据用户意图,通过 Qwen3-0.6B 微型路由器动态编排处理路径。在 DeepSeek 论文集的实测中,系统首次构建需数分钟,但二次启动仅需秒级,且具备极强的抗噪能力与逻辑推理能力。
一、架构概览
Modular RAG 不再是线性的流水线,而是一个具备“记忆”与“自检”能力的智能系统。它通过 Qwen3-0.6B 路由器动态编排路径,并实现了宏观图谱与微观向量的完美融合。
- 数据层:
- 内置示例数据集:平台在内置路径中预置了 “DeepSeek” 论文数据集。用户无需繁琐的数据准备,即可通过调用内置库实现 “一键上手”,快速验证 RAG 流程的闭环性。
- 私有数据接入:系统具备高度的灵活性,支持用户通过数据通道(SCP)自行上传 PDF、Markdown 等私有文档。通过简单的路径配置,即可实现从公共知识到行业私有知识的无缝切换。
- 深度清洗加固:针对学术 PDF 中常见的数学符号与乱码(如 \ud835),在 Embedding 前执行强制编码清洗(utf-8 ignore + decode),并在对象重建层面修复了 node_parser 的崩溃风险。
- 推理层:
- 核心引擎:采用 vLLM 作为推理后端,利用其 PagedAttention 技术提升并发处理能力。
- 启 动策略:执行显存割让策略,为后续的向量化和精排预留充足的计算余裕,确保单卡环境下多模型的稳定运行。
- 多模型协同架构:在双卡环境下通过显存分片同时调度 Qwen3-8B 负责核心推理、Qwen3-Embedding-8B 负责高并发向量化(Batch Size=30+),Qwen3-0.6B 负责毫秒级路由。
- 数据存储与检索层:
- 智能增量加载:系统默认关闭 FORCE_REBUILD。启动时优先检测 Milvus 本地库(.db)与图谱存储(graph_storage),若存在则直接加载索引,跳过耗时的 Embedding 过程。
- 异构双路索引:
- Milvus Lite:存储 740+ 页的全量切片向量,负责“细节查准”。
- PropertyGraph:存储基于论文核心摘要生成的逻辑三元组,负责“关系查全”。
- 逻辑编排层:
- 意图路由 (Router):精准识别 META_QUERY(统计)、GRAPH_QUERY(逻辑)、SUMMARY(总结)与 TECH_QUERY(细节)。
- 宏微观混合检索:
- 微观:通过 HyDE(假设文档)增强向量检索,定位具体参数。
- 宏观:通过摘要级图谱,定位论文间的演进与对比关系。
- 流程图:
-
文档:

-
代码:

-
二、优化步骤
为了实现超越 Advanced RAG 的表现,我们在代码中实施了以下三大模块化进化策略:
1:摘要级图谱构建
系统不再盲目检索,而是先通过“路由”模型判定用户到底想要什么,从而实现响应效率与准确性的双重提升。
- 痛点:传统 Graph RAG 尝试对 740 个页面切片逐一提取三元组,导致构建耗时极长且图谱充斥着无关琐碎细节。
- 优化逻辑:
- 按文归并:代码自动将 740 个切片按 file_name 归并为 24 组。
- 摘要生成:先调用 LLM 对每篇论文的前 5-7 页生成“高浓缩逻辑摘要”。
- 图谱构建:仅针对这 24 个摘要节点 提取三元组。
- 效果:构建速度提升 10 倍以上,且图谱关系(如 DeepSeek-V3 -> 改进 -> V2)更加清晰、宏观,极大地提升了回答“演进关系”类问题的质量。
2:智能路由与元数据直通
让系统“听懂”用户在问什么,而不仅仅是做关键词匹配。
- 元数据直通(Meta Query):当用户问“有多少篇论文”时,系统不经过 LLM,直接扫描 Document Metadata 进行 Python 级统计,实现了 0 幻觉 的精准统计回答。
- 意图分流:
- Graph Query:触发图谱检索,回答“架构区别”、“演进路线”。
- Tech Query:触发 HyDE + 向量检索,回答“参数细节”、“实验数据”。
3:双流混合检索与重排序
- HyDE 预处理:在向量检索前,让 LLM 先“脑补”一段假设性答案,用生成的答案去检索,解决了学术提问太短(如“MTP 参数?”)导致的召回失败。
- LLM 重排序:从向量库和图谱库中召回 Top-15 结果后,使用 LLMRerank 进行语义打分,只保留相关度最高的 Top-5 进入最终生成环节,有效减少了上下文窗口的噪声。
4:持久化
- Milvus 文件锁处理:解决了 Milvus Lite 独占文件锁导致的冲突问题,通过在主循环中集成 debug 指令,实现了在不退出服务的情况下动态查看底层数据库状态。
三、沐曦 (MetaX) 部署指南
本章节适用于 曦云 C500 等沐曦系列算力卡。
1. 硬件与基础环境
- 算力型号:曦云 C500 (64GB) * 2
- 算力主机:
-
jiajia-mxc:
vLLM / vllm:0.11.0 / Python 3.10 / maca 3.3.0.11
-
suanfeng-mxc:
vLLM / vllm:0.13.0 / Python 3.10 / maca 3.3.0.303
-
2. 基础步骤
-
进入算力容器,启动实例后,点击 JupyterLab 进入工作台。

3. 实现步骤
3.1 下载 LlamaIndex 与 Milvus Lite 框架
-
创建终端窗口(Terminal)

-
输入代码:
pip install --target /data/llama_libs --no-deps -i https://mirrors.aliyun.com/pypi/simple/ -U \
"pymilvus==2.6.6" milvus-lite orjson minio pathspec python-dateutil pytz six \
llama-index-core llama-index-readers-file llama-index-llms-openai llama-index-llms-openai-like \
llama-index-embeddings-huggingface llama-index-vector-stores-milvus llama-index-postprocessor-sbert-rerank \
llama-index-instrumentation llama-index-workflows llama-index-utils-workflow \
llama-index-retrievers-bm25 rank-bm25 bm25s PyStemmer \
sentence-transformers pypdf docx2txt nest-asyncio ujson grpcio google-api-core protobuf banks griffe sqlalchemy dataclasses-json marshmallow typing-inspect fsspec filetype deprecated wrapt dirtyjson tenacity jinja2 pyyaml \
pandas numpy nltk tiktoken requests charset-normalizer urllib3 certifi idna sniffio anyio h11 httpcore httpx mypy_extensions typing_extensions scikit-learn scipy joblib threadpoolctl tqdm pyarrow \
ragas langchain-core langchain-openai langsmith requests_toolbelt "numpy<2.0" uuid_utils tenacity regex appdirs instructor docstring_parser langchain_community llama-index-llms-huggingface jsonpatch
pip install griffe -t /data/llama_libs
pip install tinytag -t /data/llama_libs
pip install accelerate -
完成下载后,新建一个新的终端:

3.2 启动 vLLM 推理
-
在新的终端内输入代码:
CUDA_VISIBLE_DEVICES=0 vllm serve /mnt/moark-models/Qwen3-8B --gpu-memory-utilization 0.7 --port 8000 -
当终端提示
INFO: Application startup compete,则完成vLLM启动步骤。
3.3 创建并运行 Python 脚本
-
点击 Python File:

-
输入代码:
import sys, os, asyncio, nest_asyncio, torch, shutil, logging
from transformers import AutoModelForCausalLM, AutoTokenizer
from tqdm.asyncio import tqdm # 引入进度条库
# --- 0. 日志降噪 (屏蔽 HTTP 刷屏) ---
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("openai").setLevel(logging.WARNING)
# 1. 环境初始化与路径保护
PRIVATE_LIB = "/data/llama_libs"
if PRIVATE_LIB not in sys.path:
sys.path.insert(0, PRIVATE_LIB)
from llama_index.llms.huggingface import HuggingFaceLLM
nest_asyncio.apply()
# 核心组件导入
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, StorageContext, Settings, PromptTemplate, load_index_from_storage, Document
from llama_index.core.schema import TextNode
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.openai_like import OpenAILike
from llama_index.vector_stores.milvus import MilvusVectorStore
from llama_index.core.node_parser import HierarchicalNodeParser, get_leaf_nodes
from llama_index.core.retrievers import RecursiveRetriever
from llama_index.core.postprocessor import LLMRerank
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core import PropertyGraphIndex
from llama_index.core.indices.property_graph import ImplicitPathExtractor, SimpleLLMPathExtractor
# --- 全局配置参数 ---
DATA_DIR = "/mnt/moark-models/deepseek_papers"
EMBED_PATH = "/mnt/moark-models/Qwen3-Embedding-8B"
LLM_MODEL = "/mnt/moark-models/Qwen3-8B"
ROUTER_MODEL_PATH = "/mnt/moark-models/Qwen3-0.6B"
GRAPH_STORAGE_DIR = "./graph_storage_final"
MILVUS_FILE = "./modular_rag_final.db"
# --- 控制开关 ---
FORCE_REBUILD_VECTOR = False
FORCE_REBUILD_GRAPH = False
# --- 工具函数 ---
def clean_think_tag(text):
text = str(text).strip()
if not text: return "(正在组织语言...)"
if "</think>" in text:
return text.split("</think>")[-1].strip()
return text
def generate_hypothetical_answer(query, llm):
hyde_prompt = f"针对以下问题写一段技术性模拟回答(仅限技术语境):\n问题:{query}\n回答:"
response = llm.complete(hyde_prompt)
return clean_think_tag(response.text)
def get_intent_via_router(user_input, router_llm):
val = user_input.lower()
if any(k in val for k in ["多少篇", "几篇", "清单", "列表", "文件名", "哪些论文", "库里有什么"]): return "META_QUERY"
if any(k in val for k in ["关系", "演进", "对比", "联系", "改进", "区别", "架构"]): return "GRAPH_QUERY"
if any(g in val for g in ["你好", "您好", "你是谁", "在吗"]) or len(val) < 4: return "GREETING"
if any(s in val for s in ["总结", "核心", "大意", "概括", "讲了什么"]): return "SUMMARY"
return "TECH_QUERY"
def verify_relevance(query, response_nodes, llm):
if not response_nodes: return "LOW"
context_preview = "\n".join([n.node.get_content()[:200] for n in response_nodes[:3]])
grade_prompt = f"问题:{query}\n资料:{context_preview}\n判断资料是否包含答案。只回答:[YES] 或 [NO]。"
res = llm.complete(grade_prompt)
return "HIGH" if "[YES]" in res.text.upper() else "LOW"
def rewrite_query_logic(query, llm):
res = llm.complete(f"将问题 '{query}' 改写为2个学术搜索词,分号隔开。")
return res.text.strip()
def compress_context_nodes(query, source_nodes, llm):
refined_facts = []
for node in source_nodes[:3]:
refine_prompt = f"提取与‘{query}’相关的核心技术事实:\n内容:{node.node.get_content()[:800]}"
res = llm.complete(refine_prompt)
fact = clean_think_tag(res.text)
if len(fact) > 15: refined_facts.append(f"• {fact}")
return "\n".join(refined_facts) if refined_facts else "未发现显著事实。"
# --- 核心:数据完整性校验 ---
def validate_knowledge_base(source_docs, vector_index, graph_index):
print("\n" + "="*15 + " [知识库完整性校验报告] " + "="*15)
# 统计独立文件数
unique_files = set([d.metadata.get('file_name', 'unknown') for d in source_docs])
print(f"1. 本地源文件: {len(unique_files)} 篇 PDF (共 {len(source_docs)} 页切片)")
print(f"2. 向量检索库: {'[已加载]' if vector_index else '[未初始化]'}")
graph_status = "未构建"
if graph_index:
try:
triplet_count = len(graph_index.property_graph_store.get_triplets(limit=10000))
graph_status = f"正常 (包含约 {triplet_count}+ 关系边)"
except:
graph_status = "已加载"
print(f"3. 知识图谱: {graph_status}")
if len(source_docs) > 0 and not vector_index:
print("\n[警告] 向量库缺失!")
else:
print("\n[结果] 系统就绪,数据加载完整。")
print("="*50 + "\n")
# --- 核心优化:生成摘要节点 (按文件分组 + 进度条) ---
async def generate_summary_nodes(documents, llm):
# 1. 将页面按文件名分组
doc_map = {}
for doc in documents:
fname = doc.metadata.get('file_name', 'unknown_file')
if fname not in doc_map:
doc_map[fname] = []
doc_map[fname].append(doc)
unique_files = list(doc_map.keys())
print(f">>> 正在处理 {len(unique_files)} 篇独立文档 (已自动合并 {len(documents)} 个页面切片)...")
semaphore = asyncio.Semaphore(5) # 允许5个并发请求
async def process_file_summary(fname):
async with semaphore:
pages = doc_map[fname]
context_text = "\n".join([p.get_content() for p in pages[:5]])[:7000] # 截取前7000字符
prompt = (
f"请阅读以下论文《{fname}》的片段,生成一份包含核心概念、技术架构和关键结论的详细摘要。"
"摘要不需要寒暄,直接输出技术干货,重点描述实体之间的关系。"
f"\n\n原文片段:\n{context_text}..."
)
try:
response = await llm.acomplete(prompt)
summary_text = clean_think_tag(response.text)
base_meta = pages[0].metadata
node = TextNode(text=f"论文《{fname}》的核心摘要:\n{summary_text}", metadata=base_meta)
return node
except Exception as e:
# print(f" × 生成失败 {fname}: {e}") # 进度条模式下减少print
return None
tasks = [process_file_summary(fname) for fname in unique_files]
results = []
for f in tqdm.as_completed(tasks, desc="生成图谱摘要", unit="篇"):
res = await f
if res: results.append(res)
return results
# GraphRAG 构建
async def build_graph_index(summary_nodes, llm):
print(f"\n>>> 正在基于 {len(summary_nodes)} 个摘要节点构建语义图谱...")
# 增加提取数量,因为现在是针对全篇摘要
kg_extractor = SimpleLLMPathExtractor(llm=llm, max_paths_per_chunk=15)
index = PropertyGraphIndex(
summary_nodes,
path_extractors=[kg_extractor, ImplicitPathExtractor()],
llm=llm,
show_progress=True,
embed_model=Settings.embed_model
)
return index
# --- 主程序 ---
async def main():
# --- Step 1: 初始化 ---
print(">>> 正在 启动 64GB 显存...")
Settings.embed_batch_size = 30
Settings.embed_model = HuggingFaceEmbedding(model_name=EMBED_PATH, device="cuda:1", trust_remote_code=True, model_kwargs={"torch_dtype": torch.float16})
Settings.llm = OpenAILike(model=LLM_MODEL, api_base="http://localhost:8000/v1", api_key="fake", is_chat_model=True, timeout=120.0)
router_llm = HuggingFaceLLM(model_name=ROUTER_MODEL_PATH, tokenizer_name=ROUTER_MODEL_PATH, device_map="cuda:1", generate_kwargs={"temperature": 0.0, "max_new_tokens": 15})
reader = SimpleDirectoryReader(input_dir=DATA_DIR, recursive=True)
raw_documents = reader.load_data()
print(f" (正在预检 {len(raw_documents)} 页原始切片...)")
documents = []
for doc in raw_documents:
original_text = doc.get_content() or ""
if original_text:
cleaned_text = original_text.encode('utf-8', 'ignore').decode('utf-8')
new_doc = Document(
text=cleaned_text, metadata=doc.metadata or {},
excluded_embed_metadata_keys=doc.excluded_embed_metadata_keys or [],
excluded_llm_metadata_keys=doc.excluded_llm_metadata_keys or [],
id_=doc.id_
)
documents.append(new_doc)
node_parser = HierarchicalNodeParser.from_defaults(chunk_sizes=[1536, 512, 256])
all_nodes = node_parser.get_nodes_from_documents(documents)
# --- Step 2: 向量库逻辑 ---
vector_index = None
milvus_exists = os.path.exists(MILVUS_FILE) and not FORCE_REBUILD_VECTOR
if milvus_exists:
print(">>> 检测到现有 Milvus 数据库,正在直接加载...")
try:
vector_store = MilvusVectorStore(uri=MILVUS_FILE, dim=4096, overwrite=False)
vector_index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
print(" √ 向量库加载成功!")
except Exception as e:
print(f" × 加载失败 ({e}),准备重建...")
milvus_exists = False
if not milvus_exists:
print(">>> 正在初始化 Milvus 向量库...")
leaf_nodes = []
for n in get_leaf_nodes(all_nodes):
clean_text = "".join(ch for ch in str(n.text) if ch.isprintable()).strip()
if clean_text and clean_text.lower() != "nan":
n.text = clean_text
leaf_nodes.append(n)
vector_store = MilvusVectorStore(uri=MILVUS_FILE, dim=4096, overwrite=True)
vec_storage_context = StorageContext.from_defaults(vector_store=vector_store)
vec_storage_context.docstore.add_documents(all_nodes)
vector_index = VectorStoreIndex(leaf_nodes, storage_context=vec_storage_context, show_progress=True)
print(" √ 向量库构建完成。")
# --- Step 3: 图谱索引加载与重构 ---
graph_index = None
graph_exists = os.path.exists(GRAPH_STORAGE_DIR) and not FORCE_REBUILD_GRAPH
if graph_exists:
print(">>> 检测到本地图谱,正在 尝试加载...")
try:
graph_storage_context = StorageContext.from_defaults(persist_dir=GRAPH_STORAGE_DIR)
graph_index = PropertyGraphIndex(nodes=[], storage_context=graph_storage_context, llm=Settings.llm, embed_model=Settings.embed_model)
print(" √ 本地图谱加载成功。")
except Exception as e:
print(f" × 图谱加载失败 ({e}),准备自动重构...")
graph_exists = False
if not graph_exists:
print(">>> 开始构建新的语义图谱...")
if os.path.exists(GRAPH_STORAGE_DIR): shutil.rmtree(GRAPH_STORAGE_DIR)
# 1. 生成摘要 (带进度条)
summary_nodes = await generate_summary_nodes(documents, Settings.llm)
# 2. 构建图谱
graph_index = await build_graph_index(summary_nodes, Settings.llm)
# 3. 持久化
graph_index.storage_context.persist(persist_dir=GRAPH_STORAGE_DIR)
print(f" √ 图谱构建成功并保存至 {GRAPH_STORAGE_DIR}")
# --- Step 4: 校验与配置 ---
validate_knowledge_base(documents, vector_index, graph_index)
reranker = LLMRerank(llm=Settings.llm, choice_batch_size=10, top_n=5)
query_engine = RetrieverQueryEngine.from_args(
retriever=RecursiveRetriever("vector", retriever_dict={"vector": vector_index.as_retriever(similarity_top_k=10)}, node_dict={node.node_id: node for node in all_nodes}),
node_postprocessors=[reranker]
)
# --- Step 5: 交互循环 ---
print("\n" + "="*50 + "\n粒术 Modular RAG 已就绪!")
chat_history = []
while True:
try:
torch.cuda.empty_cache()
raw_input = input("\n用户 >> ").strip()
if raw_input.lower() in ['exit', 'quit', '退出']: break
if not raw_input: continue
user_input = raw_input.encode('utf-8', 'ignore').decode('utf-8')
intent = get_intent_via_router(user_input, router_llm)
print(f" (意图识别: {intent})")
if intent == "GREETING":
res = Settings.llm.complete(f"你是论文专家粒术。回应问候:{user_input}。")
print(f"\n粒术 >> {clean_think_tag(res.text)}")
elif intent == "META_QUERY":
unique_files = list(set([n.metadata.get('file_name', '未知文档') for n in documents]))
res_text = f"粒术守护着 **{len(unique_files)}** 篇 DeepSeek 论文。\n清单前5项:\n" + "\n".join([f"- {d}" for d in unique_files[:5]])
if len(unique_files) > 5: res_text += f"\n...等共 {len(unique_files)} 篇"
print(f"\n粒术 >> {res_text}")
elif intent == "SUMMARY":
response = query_engine.query(f"请基于全篇论文深度总结:{user_input}")
print(f"\n粒术 >> {clean_think_tag(response.response)}")
elif intent == "GRAPH_QUERY" and graph_index:
print(" (检索语义图谱路径 [基于宏观摘要]...)")
graph_retriever = graph_index.as_retriever(include_text=True, similarity_top_k=3)
nodes = graph_retriever.retrieve(user_input)
graph_context = "\n".join([n.node.get_content() for n in nodes])
res = Settings.llm.complete(f"结合图谱逻辑(基于论文核心摘要)回答:{user_input}\n宏观背景:\n{graph_context}")
print(f"\n粒术(图谱增强) >> {clean_think_tag(res.text)}")
else:
max_retries = 2
attempt, current_query, final_answer = 0, user_input, ""
full_context_query = f"历史:{chat_history[-1]['user']}\n问题:{user_input}" if chat_history else user_input
while attempt < max_retries:
attempt += 1
print(f" (深度融合检索尝试 {attempt}...)")
hyde_doc = generate_hypothetical_answer(full_context_query, Settings.llm)
v_res = query_engine.query(f"问题:{full_context_query}\n背景:{hyde_doc}")
g_nodes = []
if graph_index:
g_nodes = graph_index.as_retriever(include_text=True, similarity_top_k=2).retrieve(user_input)
combined_nodes = v_res.source_nodes + g_nodes
score = verify_relevance(user_input, combined_nodes, Settings.llm)
if score == "HIGH" or attempt == max_retries:
comp_context = compress_context_nodes(user_input, combined_nodes, Settings.llm)
final_prompt = (
"【严格指令】你只能使用资料回答。严禁提及外部信息!\n"
f"参考事实:\n{comp_context}\n问题:{user_input}\n回答:"
)
final_res = Settings.llm.complete(final_prompt)
final_answer = clean_think_tag(final_res.text)
break
current_query = rewrite_query_logic(current_query, Settings.llm)
print(f"\n粒术 >> {final_answer}")
chat_history.append({"user": user_input, "assistant": final_answer})
if len(chat_history) > 5: chat_history.pop(0)
except Exception as e:
print(f"\n 自动修复中: {e}")
if __name__ == "__main__":
asyncio.run(main()) -
按
Ctrl + S保存文件,并完成文件命名test。新建一个终端,输入python test.py,即可进入 Modular RAG 系统。

四、燧原 (Enflame) 部署指南
本章节适用于 燧原 S60 等燧原系列算力卡。
1. 硬件与基础环境
- 算力型号:燧原 S60(48GB) * 2
- 算力主机:bd-suiyuan-node:
vLLM / 0.11.0 / Python 3.12 / ef 1.7.0.14
2. 基础步骤
-
进入算力容器,启动实例后,点击 JupyterLab 进入工作台。
