在RAG(检索和生成)这样的框架内管理和处理多个文档有很大的挑战。关键不仅在于提取相关内容,还在于选择包含用户查询所寻求的信息的适当文档。基于用户查询对齐的多粒度特性,需要动态选择文档,本文将介绍结构化层次检索来解决多文档RAG问题。

一、Llamaindex结构化检索介绍

Llamaindex支持多层次信息检索。它不只是筛选文档,而是利用元数据过滤来简化选择过程。通过使用自动检索机制,这些过滤器可以根据用户查询检索出最相关的文档。这个过程包括推断语义查询,在矢量数据库中确定最佳过滤器集,有效地将文本到SQL和语义搜索的能力结合起来。

二、结构化层次检索的优点

下面介绍Llamaindex提供的结构化分层检索的一些好处:

  1. 增强相关性:通过利用元数据驱动的过滤器,可以准确地识别和检索符合用户查询细微要求的文档。这确保了内容选择中更高的相关性和准确性;

  2. 动态文档选择:与传统的静态文档检索是不同,Llamaindex支持动态文档选择。Llamaindex通过根据相关文档的属性和结构化元数据灵活选择相关文档,智能地适应不同的用户查询;

  3. 高效信息检索:结构化层次检索显著提高了信息检索的效率。通过将文档预处理到元数据字典中并将其存储在矢量数据库中,该系统简化了检索过程,最大限度地减少了计算开销并优化了搜索效率;

  4. 语义查询优化:文本到SQL和语义搜索的融合使系统能够更好地理解用户意图。Llamaindex的自动检索机制将用户查询细化为语义结构,从而能够从文档存储库中精确而细致地检索信息。

三、结构化层次检索代码实现

下面使用Python代码来展示Llamaindex的基本概念,并实现一个结构化的分层检索系统。使用Llamaindex类初始化来管理矢量数据库中的文档元数据。

  • 文档添加add_document方法通过创建包含摘要和关键字等关键信息的元数据字典,将文档添加到Llamaindex;
  • 检索逻辑retrieve_documents方法通过将用户查询与矢量数据库中的元数据过滤器进行匹配来处理用户查询。为了演示目的,使用了一个基本的模拟匹配逻辑;
  • 匹配机制match_metadata方法模拟用户查询和文档元数据之间的匹配过程。这是一个简化的演示逻辑,通常会使用更高级的NLP或语义分析技术。

本示例旨在说明Llamaindex的核心概念,展示如何通过Python中的简化实现来存储文档元数据并基于用户查询检索相关文档。

步骤1:安装库

!pip install llama-index wandb llama_hub weaviate-client --quiet

步骤2:导入库

import osimport openaiimport loggingimport sysfrom IPython.display import Markdown, displayfrom llama_index.llms import OpenAIfrom llama_index.callbacks import CallbackManager, WandbCallbackHandlerfrom llama_index import load_index_from_storageimport pandas as pdfrom llama_index.query_engine import PandasQueryEnginefrom pprint import pprintfrom llama_index import (VectorStoreIndex,SimpleKeywordTableIndex,SimpleDirectoryReader,StorageContext,ServiceContext,)import nest_asyncionest_asyncio.apply()#SetupOPEN API Keyos.environ["OPENAI_API_KEY"] = ""# openai_key = "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" #<--- Your API KEY# openai.api_key = openai_keylogging.basicConfig(stream=sys.stdout, level=logging.INFO)logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))# initialise WandbCallbackHandler and pass any wandb.init argswandb_args = {"project":"llama-index-report"}wandb_callback = WandbCallbackHandler(run_args=wandb_args)# pass wandb_callback to the service contextcallback_manager = CallbackManager([wandb_callback])service_context = ServiceContext.from_defaults(llm=OpenAI(model="gpt-3.5-turbo-0613", temperature=0), chunk_size=1024, callback_manager=callback_manager)

步骤3:下载Githubissues

os.environ["GITHUB_TOKEN"] = ""from llama_hub.github_repo_issues import (GitHubRepositoryIssuesReader,GitHubIssuesClient,)github_client = GitHubIssuesClient()loader = GitHubRepositoryIssuesReader(github_client,owner="run-llama",repo="llama_index",verbose=True,)orig_docs = loader.load_data()limit = 100docs = []for idx, doc in enumerate(orig_docs):doc.metadata["index_id"] = doc.id_if idx >= limit:breakdocs.append(doc)# OutputFound 100 issues in the repo page 1Resulted in 100 documentsFound 100 issues in the repo page 2Resulted in 200 documentsFound 100 issues in the repo page 3Resulted in 300 documentsFound 8 issues in the repo page 4Resulted in 308 documentsNo more issues found, stopping
from copy import deepcopyimport asynciofrom tqdm.asyncio import tqdm_asynciofrom llama_index import SummaryIndex, Document, ServiceContextfrom llama_index.llms import OpenAIfrom llama_index.async_utils import run_jobsasync def aprocess_doc(doc, include_summary: bool = True):"""Process doc."""print(f"Processing {doc.id_}")metadata = doc.metadatadate_tokens = metadata["created_at"].split("T")[0].split("-")year = int(date_tokens[0])month = int(date_tokens[1])day = int(date_tokens[2])assignee = ("" if "assignee" not in doc.metadata else doc.metadata["assignee"])size = ""if len(doc.metadata["labels"]) > 0:size_arr = [l for l in doc.metadata["labels"] if "size:" in l]size = size_arr[0].split(":")[1] if len(size_arr) > 0 else ""new_metadata = {"state": metadata["state"],"year": year,"month": month,"day": day,"assignee": assignee,"size": size,"index_id": doc.id_,}# now extract out summarysummary_index = SummaryIndex.from_documents([doc])query_str = "Give a one-sentence concise summary of this issue."query_engine = summary_index.as_query_engine(service_context=ServiceContext.from_defaults(llm=OpenAI(model="gpt-3.5-turbo")))summary_txt = str(query_engine.query(query_str))new_doc = Document(text=summary_txt, metadata=new_metadata)return new_docasync def aprocess_docs(docs):"""Process metadata on docs."""new_docs = []tasks = []for doc in docs:task = aprocess_doc(doc)tasks.append(task)new_docs = await run_jobs(tasks, show_progress=True, workers=5)# new_docs = await tqdm_asyncio.gather(*tasks)return new_docsnew_docs = await aprocess_docs(docs)# OutputProcessing 9398Processing 9427Processing 9613Processing 9417Processing 9612Processing 8832Processing 9609Processing 9353Processing 9431Processing 9426Processing 9425Processing 9435Processing 9419Processing 9571Processing 9373Processing 9383Processing 9408Processing 9405Processing 9372Processing 9546Processing 9565Processing 9664Processing 9560Processing 9470Processing 9343Processing 9518Processing 9358Processing 8536Processing 9385Processing 9380Processing 9510Processing 9352Processing 9368Processing 7457Processing 8893Processing 9583Processing 9312Processing 7720Processing 9219Processing 9481Processing 9469Processing 9655Processing 9477Processing 9670Processing 9475Processing 9667Processing 9665Processing 9348Processing 9471Processing 9342Processing 9488Processing 9338Processing 9523Processing 9416Processing 7726Processing 9522Processing 9652Processing 9520Processing 9651Processing 7244Processing 9650Processing 9519Processing 9649Processing 9492Processing 9603Processing 9509Processing 9269Processing 9491Processing 8802Processing 9525Processing 9611Processing 9543Processing 8551Processing 9627Processing 9450Processing 9658Processing 9421Processing 9394Processing 9653Processing 9439Processing 9604Processing 9413Processing 9507Processing 9625Processing 9490Processing 9626Processing 9483Processing 9638Processing 7744Processing 9472Processing 8475Processing 9244Processing 9618100%|██████████| 100/100 [02:07<00:00,1.27s/it]

步骤4:将数据加载到Weaviate Vector Store

from llama_index.vector_stores import WeaviateVectorStorefrom llama_index.storage import StorageContextfrom llama_index import VectorStoreIndeximport weaviate# cloudauth_config = weaviate.AuthApiKey(api_key="")client = weaviate.Client("https://.weaviate.network",auth_client_secret=auth_config,)class_name = "LlamaIndex_auto"vector_store = WeaviateVectorStore(weaviate_client=client, index_name=class_name)storage_context = StorageContext.from_defaults(vector_store=vector_store)# Since "new_docs" are concise summaries, we can directly feed them as nodes into VectorStoreIndexindex = VectorStoreIndex(new_docs, storage_context=storage_context)docs[0].metadata# Output{'state': 'open', 'created_at': '2023-12-21T20:18:03Z', 'url': 'https://api.github.com/repos/run-llama/llama_index/issues/9655', 'source': 'https://github.com/run-llama/llama_index/pull/9655', 'labels': ['size:L'], 'index_id': '9655'}

步骤5:对原始文档建立WeaviateIndex

vector_store = WeaviateVectorStore(weaviate_client=client, index_name=doc_class_name)storage_context = StorageContext.from_defaults(vector_store=vector_store)doc_index = VectorStoreIndex.from_documents(docs, storage_context=storage_context)

步骤6:建立自动检索机制

自动检索器的设置过程通过分为以下几个关键步骤:

  1. 定义Schema:定义向量数据库模式,包括元数据字段;

  2. VectorIndexAutoRetriever初始化:实例化此类将创建一个利用压缩元数据索引的检索器。需要定义的Schema作为其输入;

  3. 创建Wrapper Retriever:该步骤主要将每个节点后处理为IndexNode。此转换包含一个链接回源文档的索引ID,此链接支持在后面的部分中进行递归检索,依靠IndexNode对象与下游检索器、查询引擎或其他节点连接。

6(a)定义Schema

from llama_index.vector_stores.types import MetadataInfo, VectorStoreInfovector_store_info = VectorStoreInfo(content_info="Github Issues",metadata_info=[MetadataInfo(name="state",description="Whether the issue is `open` or `closed`",type="string",),MetadataInfo(name="year",description="The year issue was created",type="integer",),MetadataInfo(name="month",description="The month issue was created",type="integer",),MetadataInfo(name="day",description="The day issue was created",type="integer",),MetadataInfo(name="assignee",description="The assignee of the ticket",type="string",),MetadataInfo(name="size",description="How big the issue is (XS, S, M, L, XL, XXL)",type="string",),],)

6(b)实例化VectorIndexAutoRetriever

from llama_index.retrievers import VectorIndexAutoRetrieverretriever = VectorIndexAutoRetriever(index,vector_store_info=vector_store_info,similarity_top_k=2,empty_query_top_k=10,# if only metadata filters are specified, this is the limitverbose=True,)
nodes = retriever.retrieve("Tell me about some issues on 12/11")print(f"Number retrieved: {len(nodes)}")print(nodes[0].metadata)# OutputUsing query str: Using filters: [('month', '==', 12), ('day', '==', 11)]Number retrieved: 6{'state': 'open', 'year': 2023, 'month': 12, 'day': 11, 'assignee': '', 'size': 'XL', 'index_id': '9431'}

6(c)定义Wrapper Retriever

from llama_index.retrievers import BaseRetrieverfrom llama_index.indices.query.schema import QueryBundlefrom llama_index.schema import IndexNode, NodeWithScoreclass IndexAutoRetriever(BaseRetriever):"""Index auto-retriever."""def __init__(self, retriever: VectorIndexAutoRetriever):"""Init params."""self.retriever = retrieverdef _retrieve(self, query_bundle: QueryBundle):"""Convert nodes to index node."""retrieved_nodes = self.retriever.retrieve(query_bundle)new_retrieved_nodes = []for retrieved_node in retrieved_nodes:index_id = retrieved_node.metadata["index_id"]index_node = IndexNode.from_text_node(retrieved_node.node, index_id=index_id)new_retrieved_nodes.append(NodeWithScore(node=index_node, score=retrieved_node.score))return new_retrieved_nodesindex_retriever = IndexAutoRetriever(retriever=retriever)

步骤7:建立递归检索机制

这种类型的检索器将检索器的每个节点连接到另一个检索器、查询引擎或节点。该设置包括将每个汇总的元数据节点链接到与相应文档对应的RAG管道对齐的检索器。

配置过程如下:

  1. 为每个文档定义一个检索器,并把他们添加到字典中;

  2. 定义递归检索器:在参数中定义包括root检索器(汇总元数据检索器)和其他文档检索器。

from llama_index.vector_stores.types import (MetadataFilter,MetadataFilters,FilterOperator,)retriever_dict = {}query_engine_dict = {}for doc in docs:index_id = doc.metadata["index_id"]# filter for the specific doc idfilters = MetadataFilters(filters=[MetadataFilter(key="index_id", operator=FilterOperator.EQ, value=index_id),])retriever = doc_index.as_retriever(filters=filters)query_engine = doc_index.as_query_engine(filters=filters)retriever_dict[index_id] = retrieverquery_engine_dict[index_id] = query_engine
from llama_index.retrievers import RecursiveRetriever# note: can pass `agents` dict as `query_engine_dict` since every agent can be used as a query enginerecursive_retriever = RecursiveRetriever("vector",retriever_dict={"vector": index_retriever, **retriever_dict},# query_engine_dict=query_engine_dict,verbose=True,)nodes = recursive_retriever.retrieve("Tell me about some issues on 12/11")print(f"Number of source nodes: {len(nodes)}")nodes[0].node.metadata# OutputRetrieving with query id None: Tell me about some issues on 12/11Using query str: Using filters: [('month', '==', 12), ('day', '==', 11)]Retrieved node with id, entering: 9431Retrieving with query id 9431: Tell me about some issues on 12/11Retrieving text node: Dev awiss# DescriptionTry to use clickhouse as vectorDB.Try to chunk docs with independent parser service.Special designed schema and tricks for better query and retriever. Fixes # (issue)## Type of ChangePlease delete options that are not relevant.- [ ] Bug fix (non-breaking change which fixes an issue)- [ ] New feature (non-breaking change which adds functionality)- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)- [ ] This change requires a documentation update# How Has This Been Tested" />filters = MetadataFilters(filters=[MetadataFilter(key="theme", value="Mafia"),MetadataFilter(key="author", value="Stephen King"),],condition=FilterCondition.OR,)retriever = index.as_retriever(filters=filters)### Version0.9.13### Steps to Reproducenodes = [TextNode(text="The Shawshank Redemption",metadata={"author": "Stephen King","theme": "Friendship",},),TextNode(text="The Godfather",metadata={"director": "Francis Ford Coppola","theme": "Mafia",},),TextNode(text="Inception",metadata={"director": "Christopher Nolan",},),]filters = MetadataFilters(filters=[MetadataFilter(key="theme", value="Mafia"),MetadataFilter(key="author", value="Stephen King"),],condition=FilterCondition.OR,)retriever = index.as_retriever(filters=filters)### Relevant Logs/Tracbacks_No response_Retrieved node with id, entering: 9427Retrieving with query id 9427: Tell me about some issues on 12/11Retrieving text node: [Feature Request]: Postgres BM25 support### Feature DescriptionFeature: add a variation of PGVectorStore which uses ParadeDB's BM25 extension.BM25 is now possible in Postgres with a Rust extension [pg_bm25): https://github.com/paradedb/paradedb/tree/dev/pg_bm25Unsure if it might be better to use [pg_search](https://github.com/paradedb/paradedb/tree/dev/pg_search) and get HNSW at the same time..I'm interested in contributing on this myself, but am just starting to look into it. Interested to hear others' thoughts.### ReasonAlthough the code comments for the PGVectorStore class currently suggest BM25 search is present in Postgres - it is not.### Value of FeatureBM25 retrieval hit rate and MRR is measurable better than Postgres full text search with tsvector and tsquery. Indexing is also supposed to be faster with pg_bm25.Number of source nodes: 6{'state': 'open', 'created_at': '2023-12-11T10:17:52Z', 'url': 'https://api.github.com/repos/run-llama/llama_index/issues/9431', 'source': 'https://github.com/run-llama/llama_index/pull/9431', 'labels': ['size:XL'], 'index_id': '9431'}

步骤8:插入RetrieverQueryEngine

from llama_index.query_engine import RetrieverQueryEnginefrom llama_index import ServiceContextllm = OpenAI(model="gpt-3.5-turbo")service_context = ServiceContext.from_defaults(llm=llm)query_engine = RetrieverQueryEngine.from_args(recursive_retriever, llm=llm)response = query_engine.query("Tell me about some open issues related to agents")print(str(response)) # OutputThere were several issues created on 12/11. One of them is a bug where the metadata filter is not working correctly with Elastic search indexing. Another bug involves an error loading the 'punkt' module in the NLTK library. There are also a couple of feature requests, one for adding Postgres BM25 support and another for making llama-index compatible with models finetuned and hosted on modal.com. Additionally, there is a question about using the Slack Loader with large Slack channels.

四、结论

总之,将Llamaindex集成到多文档RAG架构的结构中预示着信息检索的新时代。它能够基于结构化元数据动态选择文档,再加上语义查询优化的技巧,重塑了我们如何利用庞大文档存储库中的知识,提高了检索过程的效率、相关性和准确性。

参考文献:

[1]https://ai.gopubby.com/structured-hierarchical-retrieval-revolutionizing-multi-document-rag-architectures-f101463db689

[2]https://weaviate.io/developers/wcs/quickstart

[3]https://docs.llamaindex.ai/en/stable/examples/query_engine/multi_doc_auto_retrieval/multi_doc_auto_retrieval.html