PR

人間とAIの共進化:LangGraphで実現する賢く進化するRAGエージェント

AI

はじめに

皆さん、こんにちは!

今回は私が最近夢中になっている「成長するRAG(Retrieval Augmented Generation)システム」についてお話しします。

ビジネスの世界では、生成AIの活用が日々重要になってきています。特に、RAGは社内の膨大な情報を効率的に活用する上で欠かせないツールになっています。私自身、LangChainやLlama Index、Difyなど、さまざまな方法でRAGシステムを構築してきました。

その経験を通じて、「RAGって、新しい情報をどんどん取り込んで成長できたらもっと便利になるんじゃないか?」と思うようになりました。そこで今回、LangGraphを使って、学習して成長していくRAGエージェントを作ってみることにしたんです。

このブログでは、そんな「賢く進化するRAG」の作り方と、それがもたらす可能性について、みなさんと一緒に探っていきたいと思います。AIと人間が協力して、どんどん賢くなっていくシステム。

そんなシステム構築に向けた第一歩として、トライしてみた結果をまとめました。

システムの概要

このRAGエージェントは、以下の主要コンポーネントで構成されています

  1. LangGraph: このシステムの中核となるフレームワークで、複雑なAIワークフローを構築し、状態管理や条件分岐を効率的に行います。
  2. OpenAI Embeddings: テキストをベクトル化するための埋め込みモデル。
  3. FAISS: 高速で効率的なベクトル検索を可能にするライブラリ。
  4. LangChain: 様々なAIコンポーネントを統合するためのフレームワーク。RAGの実装に使用されています。

LangGraphを使用することで、このシステムは単なる質問応答システムを超えて、ユーザーとの対話、フィードバックの処理、動的な学習といった複雑なプロセスを柔軟に管理できるようになっています。各ステップ(クエリ処理、フィードバック取得、学習データ処理、RAGシステム更新)がLangGraphのノードとして実装され、それらが有機的に連携することで、自己学習型のRAGエージェントを実現しています。

この構成により、システムは以下のような高度な機能を実現しています:

  1. 動的な知識更新: ユーザーとのインタラクションを通じて新しい情報を学習し、知識ベースを自動的に更新。
  2. 高度な前処理: 入力テキストの自動前処理、チャンク分割、重複排除などを効率的に実行。
  3. AIによる品質評価: 新しく追加される情報はAIによって評価され、関連性や重要度が判断。
  4. 人間とAIの協調: 重要な判断が必要な場合は、人間のオペレーターに判断を仰ぐ機能を実装。

LangGraphを中心としたこの設計により、システムは高い柔軟性と拡張性を持ち、将来的な機能追加や改良も容易に行うことができます。

主要な機能

初期化と質問応答

システムは初期データを読み込み、FAISSを使用してベクトルストアを構築します。ユーザーからの質問に対して、このベクトデータベースを検索し、関連情報を抽出して回答を生成します。

def initialize_rag():
    loader = TextLoader("./rag_initial_data.txt", encoding='utf-8')
    documents = loader.load()
    text_splitter = CharacterTextSplitter(chunk_size=300, chunk_overlap=50)
    texts = text_splitter.split_documents(documents)
    vectorstore = FAISS.from_documents(texts, embedding)
    qa = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=vectorstore.as_retriever(),
        return_source_documents=False,
    )
    return qa, vectorstore

今回は学習する動きを確認するために、”rag_initial_data.txt”というテキストを準備しました。このテキストには、ひとまずRAGについて回答できるようにネットで出てきた情報を付加しています。

RAG(Retrieval-Augmented Generation:検索拡張生成)とは、大規模言語モデル(LLM)によるテキスト生成に外部情報の検索を組み合わせることで、回答の精度や信頼性を向上させる技術です

ユーザーフィードバックの処理

システムは、生成した回答に対するユーザーのフィードバックを収集します。回答が不十分だった場合、ユーザーから追加情報を取得します。

def get_user_feedback(state: State) -> State:
    print(f"AI: {state['response']}")
    feedback = input("この回答は十分でしたか? (yes/no): ")
    state["feedback"] = feedback
    return state

def process_feedback(state: State) -> State:
    if state["feedback"].lower() == "no":
        learning_data = input("追加で学習させたい情報を入力してください: ")
        state["learning_data"] = learning_data
    return state

動的な知識ベースの更新

ユーザーから提供された新しい情報は、システムの知識ベースに動的に追加されます。これにより、システムは継続的に学習し、その回答能力を向上させていきます。

def update_rag(state: State) -> State:
    if "learning_data" in state and state["learning_data"]:
        global vectorstore, learned_data
        new_texts = [state["learning_data"]]
        vectorstore.add_texts(new_texts)
        learned_data.append(state["learning_data"])
        print("RAGシステムが更新されました。")
    return state

学習内容の表示

ユーザーは「学習内容を教えて」または「学習内容を表示」と入力することで、システムが学習した新しい情報を確認することができます。

FAISSのベクトルデータを参照し、テキストに戻すことは難しいようなので、学習したデータをリストで残しておき、そのデータからテキストを出力するようにしています。

def query_rag(state: State) -> State:
    if state["query"].lower() in ["学習内容を教えて", "学習内容を表示"]:
        state["show_learned_data"] = True
        state["response"] = "学習内容を表示します。"
    else:
        response = qa.invoke({"query": state["query"]})
        state["response"] = response["result"]
        state["show_learned_data"] = False
    return state

自己学習型RAGエージェントの動作解説

実際に動作させてみた結果です。

  1. 初期知識の確認
    最初の質問「RAGとは?」に対して、システムは正確な回答を提供しました。これは、RAGに関する基本的な情報がすでにシステムの知識ベースに含まれていたことを示しています。
  2. 未知の情報への対応
    次の質問「GraphRAGとは?」に対して、システムは情報がないことを正直に認めました。これは、システムが自身の知識の限界を認識し、誤った情報を提供することを避ける能力を示しています。
  3. ユーザーフィードバックと学習
    システムの回答が不十分だったため、「no」とフィードバックしました。これを受けて、システムは追加の学習データを要求しました。
  4. 新しい情報の取り込み
    ユーザーが提供したGraphRAGに関する詳細な情報を、システムは学習データとして取り込みました。「RAGシステムが更新されました。」というメッセージは、この新しい情報が正常に処理され、知識ベースに追加されたことを示しています。
  5. 学習後の性能向上
    同じ質問「GraphRAGとは?」を再度行ったところ、システムは今度は詳細で正確な回答を提供しました。これは、システムが新しく学習した情報を効果的に統合し、利用できるようになったことを示しています。
  6. 継続的な学習の可能性
    このプロセスは繰り返し行うことができ、システムは新しい情報や更新された情報を継続的に学習していく能力があります。
質問を入力してください (終了するには 'exit' と入力):  RAGとは?
AI: RAG(Retrieval-Augmented Generation:検索拡張生成)とは、大規模言語モデル(LLM)によるテキスト生成に外部情報の検索を組み合わせることで、回答の精度や信頼性を向上させる技術です。
この回答は十分でしたか? (yes/no):  yes

質問を入力してください (終了するには 'exit' と入力):  GraphRAGとは?
AI: GraphRAGについては、情報がありませんので、わかりません。
この回答は十分でしたか? (yes/no):  no

追加で学習させたい情報を入力してください:  GraphRAGは、X(旧Twitter)上で注目を集めている検索強化生成(RAG)の新しいアプローチです。以下にGraphRAGに関する主な情報をまとめます。  ## GraphRAGの概要  GraphRAGは、グラフベースのエンティティ抽出を活用した高度なRAG戦略です[2]。従来のRAGと比較して、プライベートデータや未見のデータに対する質問応答を大幅に改善する手法とされています[3]。  ## 主要な特徴  - **グラフベースアプローチ**: GraphRAGは知識グラフを活用し、ベクトル検索と組み合わせることで豊富なコンテキストを提供します[8]。  - **エンティティ抽出**: グラフベースのエンティティ抽出を使用して、より高度な情報検索を実現します[2]。  - **ローカルvs.グローバル**: GraphRAGは、ローカルとグローバルの両方のコンテキストを考慮に入れた戦略を採用しています[2]。  ## 業界での注目  GraphRAGは、AI業界で急速に注目を集めています。  - **カンファレンスでの人気**: AI Engineer World Fairでは、GraphRAGのセッションが満員の盛況ぶりを見せました[7]。  - **専門家の関心**: Microsoft AIやNeo4jなどの大手企業の専門家がGraphRAGについて議論しています[2][8]。  - **実装の進展**: HaystackとNeo4jを組み合わせたGraphRAGの実装が紹介されています[8]。  ## 今後の展望  GraphRAGは、RAGの分野で重要な進展をもたらす可能性があります。AI専門家のBen Loricaは、GraphRAGの最新情報を定期的に共有しており、この技術の発展を注視しています[4]。  GraphRAGは、より高度な情報検索と質問応答システムの実現に向けた有望なアプローチとして、今後さらなる発展が期待されています。

RAGシステムが更新されました。

質問を入力してください (終了するには 'exit' と入力):  GraphRAGとは?
AI: GraphRAGは、グラフベースのエンティティ抽出を活用した高度な検索強化生成(RAG)戦略です。従来のRAGと比較して、プライベートデータや未見のデータに対する質問応答を大幅に改善する手法とされています。GraphRAGは知識グラフを活用し、ベクトル検索と組み合わせることで豊富なコンテキストを提供し、より高度な情報検索を実現します。また、ローカルとグローバルの両方のコンテキストを考慮に入れた戦略を採用しています。AI業界で急速に注目を集めており、専門家や大手企業が関心を持っています。今後、RAGの分野で重要な進展をもたらす可能性があります。
この回答は十分でしたか? (yes/no):  yes
質問を入力してください (終了するには 'exit' と入力):  exit
プログラムを終了します。

この例は、成長するRAGエージェントの主要な特徴である適応性と継続的学習能力を明確に示しています。システムは初期の知識不足を、ユーザーとの対話を通じて迅速に補完し、その結果、より正確で詳細な情報を提供できるようになりました。これは、AIシステムが人間との協働を通じて、動的に知識を拡張し、パフォーマンスを向上させる可能性を示す興味深い実例です。

システムの利点

  1. 継続的学習: ユーザーとの対話を通じて、システムは常に新しい情報を学習し、知識ベースを拡張します。
  2. 適応性: 特定のドメインや組織固有の情報を学習することで、カスタマイズされた回答が可能になります。
  3. 透明性: 学習した内容を表示する機能により、システムの知識の成長過程を追跡できます。
  4. 効率的な情報検索: FAISSを使用した高速ベクトル検索により、大規模なデータセットからも迅速に関連情報を抽出できます。

まとめ

今回の内容はいかがだったでしょうか。

RAGをもっと実践的に活用するためにエージェントを使うことに挑戦してみました。

エージェントは「自立して行動するAI」というのはどんな場面で役に立つのか、少しヒントをもらえた気がしています。漠然とAIエージェントは必要になるんだろうなというのは感じていましたが、自分で作ることで理解が深まりました。

また今回の仕様としては、まだまだ実用的ではない部分が残っています。ちゃんとベクトルデータベースを使って学習したデータを保持できるようし、もっと実際に使う場面での使いやすさを向上させ、役立つRAGシステムを構築できるように進化させていきたいと思います。

ノートブック全文

import faiss
import numpy as np
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.chains import RetrievalQA
from langgraph.graph import StateGraph, END
from typing import Dict, TypedDict, List, Annotated
import os
import re

openai_api_key = "API_KEY"

llm = ChatOpenAI(
    model="gpt-4o-mini",
    api_key=openai_api_key,
    temperature=0,
)

embedding = OpenAIEmbeddings(
    model="text-embedding-3-small",
    api_key=openai_api_key
)     

# 学習内容を保存するためのリスト
learned_data: List[str] = []

# RAGシステムの初期化を行う関数
def initialize_rag():
    # テキストデータを読み込むためのローダーを作成
    loader = TextLoader("./rag_initial_data.txt", encoding='utf-8')
    documents = loader.load()  # ドキュメントを読み込む

    # ドキュメントを分割するためのテキストスプリッターを作成
    text_splitter = CharacterTextSplitter(chunk_size=300, chunk_overlap=50)
    texts = text_splitter.split_documents(documents)  # ドキュメントを分割

    embeddings = embedding  # 埋め込みモデルを指定
    vectorstore = FAISS.from_documents(texts, embeddings)  # ベクトルストアを作成

    # 質問応答システムを作成
    qa = RetrievalQA.from_chain_type(
        llm=llm,  # 大規模言語モデルを指定
        chain_type="stuff",  # チェーンタイプを指定
        retriever=vectorstore.as_retriever(),  # リトリーバーを設定
        return_source_documents=False,  # ソースドキュメントの返却を無効化
    )
    return qa, vectorstore  # 作成した質問応答システムとベクトルストアを返す

# RAGシステムを初期化
qa, vectorstore = initialize_rag()

# エージェントの状態を定義するためのクラス
class State(TypedDict):
    query: str  # ユーザーの質問
    response: str  # AIの回答
    feedback: str  # ユーザーからのフィードバック
    learning_data: str  # 学習用のデータ
    show_learned_data: bool  # 学習内容を表示するかどうか

# ユーザーの質問に応じてRAGシステムを利用する関数
def query_rag(state: State) -> State:
    if state["query"].lower() in ["学習内容を教えて", "学習内容を表示"]:
        # "学習内容を教えて" または "学習内容を表示" という質問に対して学習内容を表示する
        state["show_learned_data"] = True
        state["response"] = "学習内容を表示します。"
    else:
        # 通常の質問に対してRAGシステムを利用して回答を生成
        response = qa.invoke({"query": state["query"]})
        state["response"] = response["result"]
        state["show_learned_data"] = False
    return state

# ユーザーからのフィードバックを取得する関数
def get_user_feedback(state: State) -> State:
    if state["show_learned_data"]:
        # 学習内容を表示する場合
        if learned_data:
            print("学習内容:")
            for i, data in enumerate(learned_data, 1):
                print(f"{i}. {data}")  # 保存された学習内容を順番に表示
        else:
            print("まだ追加の学習内容はありません。")
        state["feedback"] = "yes"  # 学習内容表示後は常に "yes" とする
        return state
    
    # 学習内容を表示しない場合、AIの回答を表示し、フィードバックを取得
    print(f"AI: {state['response']}")
    feedback = input("この回答は十分でしたか? (yes/no): ")
    state["feedback"] = feedback  # フィードバックを保存
    return state

# ユーザーのフィードバックに基づいて追加の学習データを取得する関数
def process_feedback(state: State) -> State:
    if state["show_learned_data"]:
        return state  # 学習内容を表示している場合、何もせず戻る
    if state["feedback"].lower() == "no":
        # "no" のフィードバックがあった場合、追加の学習データを取得
        learning_data = input("追加で学習させたい情報を入力してください: ")
        state["learning_data"] = learning_data  # 学習データを保存
    return state

# RAGシステムを更新する関数
def update_rag(state: State) -> State:
    if state["show_learned_data"]:
        return state  # 学習内容を表示している場合、何もせず戻る
    if "learning_data" in state and state["learning_data"]:
        # 新しい学習データがある場合、RAGシステムを更新
        global vectorstore, learned_data
        new_texts = [state["learning_data"]]
        embeddings = embedding  # 埋め込みモデルを再指定
        vectorstore.add_texts(new_texts)  # 新しいテキストをベクトルストアに追加
        learned_data.append(state["learning_data"])  # 新しい学習データをリストに追加
        print("RAGシステムが更新されました。")
    return state

# 状態遷移を定義するためのグラフを作成
workflow = StateGraph(State)

# 各関数をグラフのノードとして追加
workflow.add_node("query_rag", query_rag)
workflow.add_node("get_user_feedback", get_user_feedback)
workflow.add_node("process_feedback", process_feedback)
workflow.add_node("update_rag", update_rag)

# 各ノード間のエッジを定義
workflow.add_edge("query_rag", "get_user_feedback")
workflow.add_edge("get_user_feedback", "process_feedback")
workflow.add_edge("process_feedback", "update_rag")
    
# エントリーポイントの設定
workflow.set_entry_point("query_rag")

# グラフをコンパイルして実行可能なアプリケーションを作成
app = workflow.compile()

# メインループ
while True:
    # ユーザーからの質問を取得
    user_query = input("質問を入力してください (終了するには 'exit' と入力): ")
    if user_query.lower() == 'exit':
        break

    # 初期状態を作成し、ワークフローを実行
    initial_state = State(query=user_query, response="", feedback="", learning_data="", show_learned_data=False)
    for output in app.stream(initial_state):
        pass

print("プログラムを終了します。")

コメント

タイトルとURLをコピーしました