使用 Elasticsearch 嵌套密集向量支持

这个交互式笔记本将:

  • 将模型 “sentence-transformers__all-minilm-l6-v2” 从 Hugging Face 加载到 Elasticsearch ML Node 中
  • 使用 LangChain 分割器将段落分块成句子,并使用嵌套密集向量将它们索引到 Elasticsearch 中
  • 执行搜索并返回包含最相关段落的文档

依赖关系

在本笔记本中,我们将使用 Langchain 和 Elasticsearch python 客户端。

我们还需要一个正在运行的 Elasticsearch 实例,并在其中部署了 ML 节点和模型。

python3 -m pip install -qU langchain elasticsearch eland load_dotenv jq

安装

安装 Elasticsearch 及 Kibana

如果你还没有安装好自己的 Elasticsearch 及 Kibana,那么请参考一下的文章来进行安装:

  • 如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch

  • Kibana:如何在 Linux,MacOS 及 Windows 上安装 Elastic 栈中的 Kibana

在安装的时候,请选择 Elastic Stack 8.x进行安装。在安装的时候,我们可以看到如下的安装信息:

环境变量

在启动 Jupyter 之前,我们设置如下的环境变量:

export ES_USER="elastic"export ES_PASSWORD="xnLj56lTrH98Lf_6n76y"export ES_ENDPOINT="localhost"

请在上面修改相应的变量的值。

拷贝 Elasticsearch 证书

我们把 Elasticsearch 的证书拷贝到当前的目录下:

$ pwd/Users/liuxg/python/elser$ cp ~/elastic/elasticsearch-8.12.0/config/certs/http_ca.crt .$ ls http_ca.crt http_ca.crt

准备数据

我们在项目的根目录下创建如下的文件:

workplace-docs.json

[{"content": "Effective: March 2020\nPurpose\n\nThe purpose of this full-time work-from-home policy is to provide guidelines and support for employees to conduct their work remotely, ensuring the continuity and productivity of business operations during the COVID-19 pandemic and beyond.\nScope\n\nThis policy applies to all employees who are eligible for remote work as determined by their role and responsibilities. It is designed to allow employees to work from home full time while maintaining the same level of performance and collaboration as they would in the office.\nEligibility\n\nEmployees who can perform their work duties remotely and have received approval from their direct supervisor and the HR department are eligible for this work-from-home arrangement.\nEquipment and Resources\n\nThe necessary equipment and resources will be provided to employees for remote work, including a company-issued laptop, software licenses, and access to secure communication tools. Employees are responsible for maintaining and protecting the company's equipment and data.\nWorkspace\n\nEmployees working from home are responsible for creating a comfortable and safe workspace that is conducive to productivity. This includes ensuring that their home office is ergonomically designed, well-lit, and free from distractions.\nCommunication\n\nEffective communication is vital for successful remote work. Employees are expected to maintain regular communication with their supervisors, colleagues, and team members through email, phone calls, video conferences, and other approved communication tools.\nWork Hours and Availability\n\nEmployees are expected to maintain their regular work hours and be available during normal business hours, unless otherwise agreed upon with their supervisor. Any changes to work hours or availability must be communicated to the employee's supervisor and the HR department.\nPerformance Expectations\n\nEmployees working from home are expected to maintain the same level of performance and productivity as if they were working in the office. Supervisors and team members will collaborate to establish clear expectations and goals for remote work.\nTime Tracking and Overtime\n\nEmployees are required to accurately track their work hours using the company's time tracking system. Non-exempt employees must obtain approval from their supervisor before working overtime.\nConfidentiality and Data Security\n\nEmployees must adhere to the company's confidentiality and data security policies while working from home. This includes safeguarding sensitive information, securing personal devices and internet connections, and reporting any security breaches to the IT department.\nHealth and Well-being\n\nThe company encourages employees to prioritize their health and well-being while working from home. This includes taking regular breaks, maintaining a work-life balance, and seeking support from supervisors and colleagues when needed.\nPolicy Review and Updates\n\nThis work-from-home policy will be reviewed periodically and updated as necessary, taking into account changes in public health guidance, business needs, and employee feedback.\nQuestions and Concerns\n\nEmployees are encouraged to direct any questions or concerns about this policy to their supervisor or the HR department.\n","summary": "This policy outlines the guidelines for full-time remote work, including eligibility, equipment and resources, workspace requirements, communication expectations, performance expectations, time tracking and overtime, confidentiality and data security, health and well-being, and policy reviews and updates. Employees are encouraged to direct any questions or concerns","name": "Work From Home Policy","url": "./sharepoint/Work from home policy.txt","created_on": "2020-03-01","category": "teams","_run_ml_inference": true,"rolePermissions": ["demo", "manager"]},{"content": "Starting May 2022, the company will be implementing a two-day in-office work requirement per week for all eligible employees. Please coordinate with your supervisor and HR department to schedule your in-office workdays while continuing to follow all safety protocols.\n","summary": "Starting May 2022, employees will need to work two days a week in the office. Coordinate with your supervisor and HR department for these days while following safety protocols.","name": "April Work From Home Update","url": "./sharepoint/April work from home update.txt","created_on": "2022-04-29","category": "teams","_run_ml_inference": true,"rolePermissions": ["demo", "manager"]},{"content": "As we continue to prioritize the well-being of our employees, we are making a slight adjustment to our hybrid work policy. Starting May 1, 2023, employees will be required to work from the office three days a week, with two days designated for remote work. Please communicate with your supervisor and HR department to establish your updated in-office workdays.\n","summary": "Starting May 1, 2023, our hybrid work policy will require employees to work from the office three days a week and two days remotely.","name": "Wfh Policy Update May 2023","url": "./sharepoint/WFH policy update May 2023.txt","created_on": "2023-05-01","category": "teams","_run_ml_inference": true,"rolePermissions": ["demo", "manager"]},{"content": "Executive Summary:\nThis sales strategy document outlines the key objectives, focus areas, and action plans for our tech company's sales operations in fiscal year 2024. Our primary goal is to increase revenue, expand market share, and strengthen customer relationships in our target markets.\n\nI. Objectives for Fiscal Year 2024\n\nIncrease revenue by 20% compared to fiscal year 2023.\nExpand market share in key segments by 15%.\nRetain 95% of existing customers and increase customer satisfaction ratings.\nLaunch at least two new products or services in high-demand market segments.\n\nII. Focus Areas\nA. Target Markets:\nContinue to serve existing markets with a focus on high-growth industries.\nIdentify and penetrate new markets with high potential for our products and services.\n\nB. Customer Segmentation:\nStrengthen relationships with key accounts and strategic partners.\nPursue new customers in underserved market segments.\nDevelop tailored offerings for different customer segments based on their needs and preferences.\n\nC. Product/Service Portfolio:\nOptimize the existing product/service portfolio by focusing on high-demand solutions.\nDevelop and launch innovative products/services in emerging technology areas.\nEnhance post-sales support and customer service to improve customer satisfaction.\n\nIII. Action Plans\nA. Sales Team Development:\nExpand the sales team to cover new markets and industries.\nProvide ongoing training to sales staff on product knowledge, sales techniques, and industry trends.\nImplement a performance-based incentive system to reward top performers.\n\nB. Marketing and Promotion:\nDevelop targeted marketing campaigns for different customer segments and industries.\nLeverage digital marketing channels to increase brand visibility and lead generation.\nParticipate in industry events and trade shows to showcase our products and services.\n\nC. Partner Ecosystem:\nStrengthen existing partnerships and establish new strategic alliances to expand market reach.\nCollaborate with partners on joint marketing and sales initiatives.\nProvide partner training and support to ensure they effectively represent our products and services.\n\nD. Customer Success:\nImplement a proactive customer success program to improve customer retention and satisfaction.\nDevelop a dedicated customer support team to address customer inquiries and concerns promptly.\nCollect and analyze customer feedback to identify areas for improvement in our products, services, and processes.\n\nIV. Monitoring and Evaluation\nEstablish key performance indicators (KPIs) to track progress toward our objectives.\nConduct regular sales team meetings to review performance, share best practices, and address challenges.\nConduct quarterly reviews of our sales strategy to ensure alignment with market trends and adjust as needed.\n\nBy following this sales strategy for fiscal year 2024, our tech company aims to achieve significant growth and success in our target markets, while also providing exceptional value and service to our customers.\n","summary": "This sales strategy document outlines objectives, focus areas, and action plans for our tech company's sales operations in fiscal year 2024. Our primary goal is to increase revenue, expand market share, and strengthen customer relationships in our target markets. Focus areas include targeting new markets, segmenting customers, enhancing","name": "Fy2024 Company Sales Strategy","url": "./sharepoint/FY2024 Company Sales Strategy.txt","category": "teams","created_on": "2023-04-15","_run_ml_inference": true,"rolePermissions": ["demo", "manager"]},{"content": "Purpose\n\nThe purpose of this vacation policy is to outline the guidelines and procedures for requesting and taking time off from work for personal and leisure purposes. This policy aims to promote a healthy work-life balance and encourage employees to take time to rest and recharge.\nScope\n\nThis policy applies to all full-time and part-time employees who have completed their probationary period.\nVacation Accrual\n\nFull-time employees accrue vacation time at a rate of [X hours] per month, equivalent to [Y days] per year. Part-time employees accrue vacation time on a pro-rata basis, calculated according to their scheduled work hours.\n\nVacation time will begin to accrue from the first day of employment, but employees are eligible to take vacation time only after completing their probationary period. Unused vacation time will be carried over to the next year, up to a maximum of [Z days]. Any additional unused vacation time will be forfeited.\nVacation Scheduling\n\nEmployees are required to submit vacation requests to their supervisor at least [A weeks] in advance, specifying the start and end dates of their vacation. Supervisors will review and approve vacation requests based on business needs, ensuring adequate coverage during the employee's absence.\n\nEmployees are encouraged to plan their vacations around the company's peak and non-peak periods to minimize disruptions. Vacation requests during peak periods may be subject to limitations and require additional advance notice.\nVacation Pay\n\nEmployees will receive their regular pay during their approved vacation time. Vacation pay will be calculated based on the employee's average earnings over the [B weeks] preceding their vacation.\nUnplanned Absences and Vacation Time\n\nIn the event of an unplanned absence due to illness or personal emergencies, employees may use their accrued vacation time, subject to supervisor approval. Employees must inform their supervisor as soon as possible and provide any required documentation upon their return to work.\nVacation Time and Termination of Employment\n\nIf an employee's employment is terminated, they will be paid out for any unused vacation time, calculated based on their current rate of pay.\nPolicy Review and Updates\n\nThis vacation policy will be reviewed periodically and updated as necessary, taking into account changes in labor laws, business needs, and employee feedback.\nQuestions and Concerns\n\nEmployees are encouraged to direct any questions or concerns about this policy to their supervisor or the HR department.\n","summary": ": This policy outlines the guidelines and procedures for requesting and taking time off from work for personal and leisure purposes. Full-time employees accrue vacation time at a rate of [X hours] per month, equivalent to [Y days] per year. Vacation requests must be submitted to supervisors at least","name": "Company Vacation Policy","url": "https://enterprisesearch.sharepoint.com/:t:/s/MSBuilddemo/ES6rw9bKZxVBobG1WUoJpikBF9Bhx1pw_GvJWbsg-Z_HNA" />
$ pwd/Users/liuxg/python/elser$ ls workplace-docs.json workplace-docs.json

创建应用并展示

我们在当前的目录下打入如下的命令来创建 notebook:

$ pwd/Users/liuxg/python/elser$ jupyter notebook

连接到 Elasticsearch

from elasticsearch import Elasticsearchfrom dotenv import load_dotenvimport osfrom elasticsearch import Elasticsearchload_dotenv() elastic_user=os.getenv('ES_USER')elastic_password=os.getenv('ES_PASSWORD')elastic_endpoint=os.getenv("ES_ENDPOINT")url = f"https://{elastic_user}:{elastic_password}@{elastic_endpoint}:9200"client = Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True) print(client.info())

从上面的输出我们可以看到连接是成功的。

准备数据集

我们将使用 Langchain 的工具来摄取原始文档并将其分割成更小的块。 我们正在使用示例工作场所搜索数据集。

LangChain 还有许多其他加载器可以从其他来源获取数据。 有关更多信息,请参阅其核心加载程序或加载程序集成。

import json # Load data into a JSON objectwith open('workplace-docs.json') as f: data = json.load(f) print(f"Successfully loaded {len(data)} documents")with open('temp.json', 'w') as json_file:json.dump(data, json_file)

上面的代码在项目的根目录下生成一个叫做 temp.json 文件。

从 huggingface 加载模型

你需要的第一件事是一个模型,用于从块中创建文本嵌入,你可以使用任何您想要的东西,但此示例将在 minilm-l6-v2 模型上端到端运行。我们可以使用 eland 库上传文本嵌入模型。

MODEL_ID = "sentence-transformers__all-minilm-l6-v2"!eland_import_hub_model --url https://elastic:xnLj56lTrH98Lf_6n76y@localhost:9200 \--hub-model-id sentence-transformers/all-MiniLM-L6-v2 \--task-type text_embedding \--ca-cert ./http_ca.crt \--clear-previous \--start

你需要根据自己的 Elasticsearch 配置修改上面的用户名及密码。整个下载过程需要一定的时间。我们可以打开 Kibana 来查看模型的下载:

从上面的输出中,我们可以看到部署是成功的。

创建 Elasticsearch 索引

在此示例中,我们将使用管道进行推理并将嵌入存储在索引中。

在此示例中,我们使用句子 Transformers minilm-l6-v2 模型,你需要在 ML 节点上运行该模型。 通过这个模型,我们设置一个 index_pipeline 来进行推理并将嵌入存储在我们的索引中。

PIPELINE_ID = "chunk_text_to_passages"MODEL_DIMS = 384INDEX_NAME = "nb_parent_retriever_index"# Create the pipelineclient.ingest.put_pipeline(id=PIPELINE_ID, processors=[{"foreach": {"field": "passages","processor": {"inference": {"field_map": {"_ingest._value.text": "text_field"},"model_id": MODEL_ID,"target_field": "_ingest._value.vector","on_failure": [{"append": {"field": "_source._ingest.inference_errors","value": [{"message": "Processor 'inference' in pipeline 'ml-inference-title-vector' failed with message '{{ _ingest.on_failure_message }}'","pipeline": "ml-inference-title-vector","timestamp": "{{{ _ingest.timestamp }}}"}]}}]}}}}])# Create the indexclient.indices.create( index=INDEX_NAME, settings={"index": {"default_pipeline": PIPELINE_ID}},mappings={"dynamic": "true","properties": {"passages": {"type": "nested","properties": {"vector": {"properties": {"predicted_value": {"type": "dense_vector","index": True,"dims": MODEL_DIMS,"similarity": "dot_product"}}}}}}})

请注意上面的 nested 类型字段。

实用工具:父子分割函数

该函数将一个文档拆分为多个段落,并返回父文档和子段落。

它还可以选择将父文档分块为更小的文档,这意味着父文档将被拆分为多个索引文档。 我们将在示例 2 中使用它。

from langchain.text_splitter import RecursiveCharacterTextSplitterdef parent_child_splitter(documents, chunk_size: int = 200):child_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size)docs = []for i, doc in enumerate(documents):passages = []for _doc in child_splitter.split_documents([doc]):passages.append({"text": _doc.page_content,})doc = {"content": doc.page_content,"metadata": doc.metadata,"passages": passages}docs.append(doc)return docs

实用工具:漂亮的响应

此函数将以更易于阅读的格式打印出 Elasticsearch 的响应。

def pretty_response(response, show_parent_text=False):if len(response['hits']['hits']) == 0:print('Your search returned no results.')else:for hit in response['hits']['hits']:id = hit['_id']score = hit['_score']doc_title = hit['_source']["metadata"]['name']parent_text = ""if show_parent_text:parent_text = hit['_source']["content"]passage_text = ""for passage in hit['inner_hits']['passages']['hits']['hits']:passage_text += passage["fields"]["passages"][0]['text'][0] + "\n\n"pretty_output = (f"\nID: {id}\nDoc Title: {doc_title}\nparent text:\n{parent_text}\nPassage Text:\n{passage_text}\nScore: {score}\n")print(pretty_output)print("---")

完整文档,嵌套段落

在此示例中,我们将文档拆分为多个段落,并将完整文档存储为父文档。 然后,我们将这些段落存储为嵌套文档,并带有返回父文档的链接。

下面我们使用父子拆分器将完整文档拆分为段落。 Parent_child_splitter fn 返回文档列表,其中包含嵌套段落数组。

然后我们将这些文档索引到 Elasticsearch 中。 这将为完整文档建立索引,并且段落将存储在嵌套字段中。

然后,我们的索引管道处理器将对段落运行推理,并将嵌入存储在索引中。

from elasticsearch import helperschunked_docs = parent_child_splitter(loader.load(), chunk_size=600)count, errors = helpers.bulk(client, chunked_docs,index=INDEX_NAME)print(f"Indexed {count} documents with {errors} errors")import timetime.sleep(5)

我们可以在 Kibana 中查看摄入文档的格式:

做 nested搜索

我们现在可以执行嵌套搜索,以查找与我们的查询匹配的段落,这些段落将在 inner_hits 中返回。 在下面的示例中,每个父文档仅请求一个段落。

response = client.search(index=INDEX_NAME, knn={"inner_hits": {"size": 1,"_source": False,"fields": ["passages.text"]},"field": "passages.vector.predicted_value","k": 5,"num_candidates": 100,"query_vector_builder": {"text_embedding": {"model_id": "sentence-transformers__all-minilm-l6-v2","model_text": "Whats the work from home policy" />

使用 LangChain 来搜索

我们还可以通过调整查询在 Langchain 内执行此搜索。

我们还重写 doc_builder 以使用段落而不是完整文档填充 site_content。

from langchain.vectorstores.elasticsearch import ElasticsearchStore, ApproxRetrievalStrategyfrom typing import List, Unionfrom langchain_core.documents import Documentclass CustomRetrievalStrategy(ApproxRetrievalStrategy):def query(self,query: Union[str, None],filter: List[dict],**kwargs,):es_query = {"knn": {"inner_hits": {"_source": False,"fields": ["passages.text"]},"field": "passages.vector.predicted_value","filter": filter,"k": 5,"num_candidates": 100,"query_vector_builder": {"text_embedding": {"model_id": "sentence-transformers__all-minilm-l6-v2","model_text": query}}}}return es_queryvector_store = ElasticsearchStore(index_name=INDEX_NAME,es_connection=client,query_field="content",strategy=CustomRetrievalStrategy(),)def doc_builder(hit):passage_hits = hit.get("inner_hits", {}).get("passages", {}).get("hits", {}).get("hits", [])page_content = ""for passage_hit in passage_hits:passage_fields = passage_hit.get("fields", {}).get("passages", [])[0]page_content += passage_fields.get("text", [])[0] + "\n\n"return Document(page_content=page_content,metadata=hit["_source"]["metadata"],)results = vector_store.similarity_search(query="Whats the work from home policy" />

整个 notebook 的源代码可以在地址下载:https://github.com/liu-xiao-guo/semantic_search_es/blob/main/document_chunking_with_langchain_document_splitters.ipynb