LangChain,构建带有Milvus存储的问题分析任务
LangChain简述
LangChain 就是一个 LLM 编程框架,你想开发一个基于 LLM 应用,需要什么组件它都有,直接使用就行;甚至针对常规的应用流程,它利用链(LangChain中Chain的由来)这个概念已经内置标准化方案了。下面我们从新兴的大语言模型(LLM)技术栈的角度来看看为何它的理念这么受欢迎。
参考文档:
一文全面搞懂LangChain
PyMilvus简述
Milvus在我之前的文章中,已经有提到,并且讲述了如何使用Milvus Java SDK来构建存储,但在更广泛的LLM应用的场景中,Python是更为流行的开发语言,因此本文章为了能够快速构建一个基于LLM的分析任务,选择使用Python脚本。
任务目标
用户输入一个问题和相关的统计数据,并且提供了额外的一些文章/文档(PDF, TXT, DOC等),希望LLM能够结合这些信息来回答用户的问题,并给出合理地建议。
数据预处理
对文章切片,并进行向量化存入Milvus。
# 将文件内容,切分成N个块,每个块500个字符,同时连续两个段落之前会重复50字符
def load_and_split_text_file(self, file_path, chunk_size=500, chunk_overlap=50):
file_abs_path = os.path.expanduser(file_path)
filename = os.path.basename(file_abs_path)
loader = TextLoader(file_abs_path)
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
docs = text_splitter.split_documents(documents)
segments = []
for doc in docs:
if len(doc.page_content) <= 0:
continue
info = {
"filename": filename,
"text": str(doc.page_content)
}
segments.append(info)
return segments
def insert_info_data(self, infos):
""" 将数据插入到Milvus中。 """
if len(infos) <= 0:
print("empty infos, return")
return
# 构建列式数据集
entities = [
{
"name": "filename",
"type": DataType.VARCHAR,
"values": [info["filename"] for info in infos]
},
{
"name": "title",
"type": DataType.VARCHAR,
"values": [info["title"] for info in infos]
},
{
"name": "embeddings",
"type": DataType.FLOAT_VECTOR,
# 使用AzureOpenAI,对每一个文章分段,进行向量化
"values": [self.embed_by_azureopenai(info["text"]) for info in infos]
},
{
"name": "text",
"type": DataType.VARCHAR,
"values": [info["text"] for info in infos]
}
]
# 插入数据到Milvus
ids = self.insert_data(entities)
# 持久化
self._milvus.flush([self.collection_name])
insert_info_data(load_and_split_text_file('test_document.txt'))
相关文章内容检索
def get_formatted_documents(question: str, document_names: Optional[list[str]]) -> str:
""" 格式化根据相似性搜索得到的文章内容 """
content = transform_documents_to_str(search_documents(question, document_names))
tokens_num = _estimate_tokens_num(content)
if tokens_num > 3000:
# TODO: using map/reduce to generate the summary
#raise RuntimeError(f"Too many tokens {tokens_num} encoded from docs by model {models.model_factory.CHAT_MODEL_DEFAULT.model_name}, hence we will try to output the summary based on your question!")
agent = initialize_agent(
llm=models.model_factory.get_chat_model(temperature=0),
tools=tools.copilot_tools.get_tools(),
agent=AgentType.CHAT_ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
max_tokens=1000,
handle_parsing_errors=True)
return agent.invoke({
"input": "Summary the documents according to the question:\n\n# question\n{}\n# content\n{}".format(question, "\n".join(documents))
})['output']
return content
def search_documents(question: str, document_names: Optional[list[str]]) -> list[dict]:
""" 根据相似性搜索,查找与question相关的文章内容 """
content = []
try:
if document_names is not None and len(document_names) > 0:
datas = []
for document_name in document_names:
results = milvus_utility.search_by_titles(question, [document_name])
datas.extend(results)
print("data len:{}".format(len(datas)))
return datas
except BaseException:
print(sys.exc_info())
if len(content) > 0:
print("!!!!!!!!!!!search result valid!!!!!!!!!!!!!!")
return content
基于Chain构建工作流
关于Chain,这里一共定义了两大类Chain:
一类是summary_chain
,负责根据问题、数据、相关文章内容,生成一个总结性的报告;一类是output chain
,如publish_chain
,负责对cummary_chain的输出结果再加工,然后转发给客户端。
关于Cache,每一个Chain就是与LLM的一次交互,很容易想到的一个问题是,对于相同的输出内容,实际上它的输出结果应该也是相同的,因此这里需要对每一个Chain进行缓存的判断,但由于是样例代码Cache实现地比较粗糙,仅仅基于threading.Lock
和{}
,故更好的做法是引入第三方的缓存框架。
def summary_by_chain(questionAndData: list, document_names: Optional[list[str]], **kwargs):
# 生成用于描述问题、统计数据以及相关的文章内容的Prompt
question, prompt = PromptGenerator.generate_prompt_for_chain(
questionAndData, document_names, lambda text, docs: documents_helper.get_formatted_documents(text, docs))
# 构建一个回答用户question的工作链
summary_chain = utils.chain_util.debug_wrapper(_construct_chain_with_one_prompt(prompt, question=question, **kwargs), **kwargs)
outputs_chain = RunnableParallel(
# 这个字段缓存了summary_chain的结果
summaries=lambda x: x['input'],
# append a publishing chain to the summary chain
publish_output=lambda x: _generate_publish_output_with_cache(x['input'], **kwargs),
# other chain need input
other_output=lambda x: _generate_output_with_cache(x['input'], **kwargs),
)
# outputs chain's input comes from summary chain
# outputs_chain的输入变量,来自于summary_chain的输出
outputs = outputs_chain.invoke({'input': _generate_summary_with_cache(questionAndData, documents, summary_chain)})
output = {
'documents': document_names,
'referenced_documents': document_names,
'summary': outputs['summaries'],
'dynamics': {
'publish_content': outputs['publish_output'],
'other_output': outputs['other_output']
}
}
print("Output from AI: " + str(output))
return output
def _construct_chain_with_one_prompt(prompt: str, **kwargs):
# 可以提供语言,以使得LLM按指定的语言输出结果
summary_chain = (
{'language': lambda x: kwargs['language']}
| ChatPromptTemplate.from_template(prompt)
| model_factory.CHAT_MODEL_DEFAULT
| StrOutputParser())
final_chain = summary_chain.with_config(tags=['summary_chain'])
return final_chain
def _generate_summary_with_cache(questionAndData, documents, chain, **kwargs):
# 根据问题及数据,找到是否存在缓存,如果有,则直接返回之前已经生成的output,
# 否则才需要执行chain
cached_summary = GLOBAL_CACHE.load(questionAndData=questionAndData, documents=documents, cache_type="summaries")
return cached_summary if cached_summary is not None else GLOBAL_CACHE.save(questionAndData, documents, lambda x: chain.invoke(""), cache_type="summaries")
def _generate_publish_output_with_cache(content, **kwargs):
# 对content进行加密,以此作为识别不同content的依据,利用Cache逻辑,
# 避免不必要的Chat
md5 = hashlib.md5(content.encode('utf-8'))
cache_key = "PUBLISH:" + md5.hexdigest()
return _generate_output_with_cache(cache_key, content, lambda x: PublishTool.get_publish_chain(**kwargs).invoke({'summaries': x}))
方案改进
文章内容检索
由于内容检索的好坏,会由如下几点影响:
- 文章切分好坏程度
- 文章相关性及多相关性
针对影响1:可以考虑使用更加格式化的文档格式,如Markdown, XML等,以避免chunk的切分方式,提升切分后的段落内聚性;
针对影响2:考虑利用LLMChainExtractor
工具,检索到相关文章内容后,通过绑定的LLM对原文章内容做精简,达到去冗余的目的;同时考虑结合MMR(Maximal Marginal Relevance)搜索算法,使返回的文章内容尽量相关且多样化,避免文章内容过于聚焦。
使用Agent模式取代Chain链模式
Chain很明显的一个问题是,我们不得不事先定义好工作链,并为每一个Agent构造合适的Prompt,而无法充分利用LLM模型对任务的拆解能力,理论上可以通过Agent模式,让LLM分析任务、自动调用function call / tool,最终完成整个任务流,以取代Chain的固定模式。
但Agent也有缺点,就是稳定性和时效性。所谓稳定性,是每一次都需要AI交换确认下一步的工作是什么,一旦Clues信息描述不清楚以及输入数据难以理解,很容易产生不符合预期的结果;对于时效性,下一步应该做什么都需要经过一次LLM交互,因此时间必须是远大于Chain链的固定模式。
当然也可以结合Agent和Chain的优点完成工作。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!