StreamlitとRAGで作る 実用的なAIチャットボット(6) ストリーミング応答とチャット履歴の管理

Streamlit
この記事は約21分で読めます。

こんにちは、JS2IIUです。
連載「StreamlitとRAGで作る:実用的なAIチャットボット開発ガイド」の第6回です。

前回は、StreamlitとLangChain、そしてVector Storeを統合し、独自のドキュメントに基づいて回答する基本的なRAGアプリを完成させました。これで機能的には「動く」ものができました。

しかし、実際に使ってみて少し違和感を感じませんでしたか?

「質問をしてから回答が出るまで、数秒間画面がフリーズしたように止まる」
「『さっきの件だけど』と聞き返しても、前の会話を覚えていない」

これらは、アプリケーションのユーザー体験(UX)における致命的な課題です。現代のユーザーはChatGPTのような「スムーズな応答」と「文脈の理解」に慣れきっています。

今回は、この2つの壁を突破します。
LLMの回答をリアルタイムに文字送りする「ストリーミング応答」と、会話の流れを記憶する「チャット履歴の管理」を実装し、アプリの品質を一気にプロダクションレベルへ引き上げましょう。今回もよろしくお願いします。

1. はじめに:なぜ「待ち時間」と「忘却」が問題なのか

AIチャットボットにおいて、ユーザーが最もストレスを感じるのは「待たされること」です。
特にRAGの場合、「検索(Retrieval)」と「生成(Generation)」の2つの処理時間がかかります。たとえ合計で5秒だとしても、Webの世界での5秒は永遠のように長く感じられます。

しかし、もし最初の1秒で「はい、その件については…」と文字が出始めたらどうでしょうか? ユーザーは読み始めることができるため、心理的な待ち時間はほぼゼロになります。

また、会話の履歴も重要です。
「社内規定について教えて」→(回答)→「それはどこに保存されていますか?」
この「それ」が何を指すのか、以前のボットは理解できませんでした。今回は、人間と同じように文脈を理解できるボットへと進化させます。

2. 技術解説:ストリーミング(Streaming)の仕組み

Invoke vs Stream

これまでの実装では、LangChainの invoke() メソッドを使用していました。これは、LLMが回答を最後まで作り終わってから、まとめて結果を返す方式です。

一方、ストリーミング(Stream)は、LLMがトークン(単語の一部)を生成するたびに、即座にその断片を送信する方式です。

  • Invoke: (3秒待機)… 「ポテトが好きです」
  • Stream: 「ポ」…「テ」…「ト」…「が」…「好」…(以下続く)

st.write_stream の魔法

以前のStreamlitでは、このストリーミング表示を実装するために複雑なコールバック関数(CallbackHandler)を書く必要がありました。しかし、最新のバージョンでは st.write_stream という機能が追加されています。

この関数は、Pythonのジェネレータ(次々に値を出すイテレータ)を渡すだけで、あのChatGPTのようなタイプライター風の表示を自動で行ってくれます。

詳しくはこちらの記事を参考にして下さい。

【Streamlit】st.write_stream()でデータストリームを表示する | アマチュア無線局JS2IIU
https://js2iiu.com/2025/01/13/streamlitst-write_stream/

3. 技術解説:記憶(Memory)の仕組み

LLM自体は「ステートレス」であり、過去の記憶を持ちません。APIを呼び出すたびに、真っ白な状態で始まります。
では、なぜChatGPTは会話ができるのでしょうか?

答えはシンプルです。「過去のやり取りを全て、毎回プロンプトに含めて送信している」からです。

1回目の送信:

ユーザー: こんにちは

2回目の送信:

ユーザー: こんにちは
AI: こんにちは!何かお手伝いしましょうか?
ユーザー: Pythonについて教えて

このように、履歴(History)を積み上げていくことで、疑似的に記憶を持っているように振る舞わせます。今回は、Streamlitの st.session_state に会話ログを保存し、それをLangChainのプロンプトに注入する方式をとります。

4. 実装Step 1:会話履歴を管理する

まずは、会話履歴を保存する準備と、プロンプトの修正です。

RAG用のプロンプトに、検索結果(context)だけでなく、会話履歴(chat_history)が入る場所を作ります。

Python
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

# 以前のテンプレート
# template = """Context: {context} Question: {question}"""

# 履歴対応版のテンプレート
# MessagesPlaceholderを使うことで、リスト形式のメッセージを動的に挿入できます
prompt = ChatPromptTemplate.from_messages([
    ("system", "あなたはコンテキストに基づいて回答するアシスタントです。\n\n# コンテキスト\n{context}"),
    MessagesPlaceholder(variable_name="chat_history"), # ここに履歴が入る
    ("human", "{question}"),
])

5. 実装Step 2:ストリーミング出力を実装する

次に、LangChainのストリーミング実行と、Streamlitの描画をつなぎます。

Python
# chainの定義(仮)
chain = prompt | llm | StrOutputParser()

# ユーザー入力
query = "Streamlitとは?"

# .stream() メソッドでジェネレータを取得
response_generator = chain.stream({
    "context": "検索されたドキュメント...",
    "chat_history": [],
    "question": query
})

# Streamlitで表示
# これだけで、リアルタイムに文字がパラパラと表示されます
final_response = st.write_stream(response_generator)

st.write_stream は、描画を行いつつ、最終的に完成した全テキストを戻り値(final_response)として返してくれます。これを履歴に保存すればよいわけです。非常に効率的です。

6. 実装Step 3:統合コード(RAG + 履歴 + ストリーミング)

それでは、前回のコードをベースに、これらの機能を組み込んだ完成版コードを見てみましょう。

このコードでは、以下の処理フローを実現しています。

  1. 履歴のロード: st.session_state から過去の会話を取得。
  2. フォーマット変換: LangChainが理解できる形式(HumanMessage, AIMessage)に変換。
  3. 検索 & 生成: ユーザーの質問と過去の履歴を元に回答を生成(ストリーミング)。
  4. 履歴の保存: 新しい会話を st.session_state に追記。
Python
import streamlit as st
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.messages import HumanMessage, AIMessage
from dotenv import load_dotenv

# 環境変数の読み込み
load_dotenv()

# --- 設定 ---
st.set_page_config(page_title="RAG Chatbot Advanced", layout="wide")
st.title("RAGチャットボット (Streaming & History)")

# --- 1. リソースの読み込み (Cache) ---
@st.cache_resource
def load_retriever():
    embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
    try:
        # 第4回で作成したインデックスを読み込む
        vectorstore = FAISS.load_local(
            "faiss_index", 
            embeddings, 
            allow_dangerous_deserialization=True
        )
        return vectorstore.as_retriever(search_kwargs={"k": 3})
    except Exception as e:
        return None

retriever = load_retriever()

if not retriever:
    st.error("Vector Storeが見つかりません。第4回のコードを実行してインデックスを作成してください。")
    st.stop()

# --- 2. プロンプトとLLMの準備 ---
# 履歴(chat_history)を受け取れるように構造化します
prompt = ChatPromptTemplate.from_messages([
    ("system", """
    あなたは優秀なアシスタントです。
    以下の「コンテキスト」と「会話履歴」に基づいて、ユーザーの質問に答えてください。
    コンテキストにない情報は答えず、「わかりません」と伝えてください。

    # コンテキスト
    {context}
    """),
    MessagesPlaceholder(variable_name="chat_history"),
    ("human", "{question}"),
])

# ストリーミング対応のために streaming=True は必須ではありませんが、明示しておくと良いでしょう
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

# --- 3. チャット履歴の管理 (Session State) ---
if "messages" not in st.session_state:
    st.session_state.messages = []

# 過去の会話をUIに表示
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])

# --- 4. 履歴データの整形関数 ---
# LangChainに渡すために、辞書型の履歴をMessageオブジェクトのリストに変換します
def get_chat_history_features(messages):
    history_buffer = []
    for msg in messages:
        if msg["role"] == "user":
            history_buffer.append(HumanMessage(content=msg["content"]))
        elif msg["role"] == "assistant":
            history_buffer.append(AIMessage(content=msg["content"]))
    return history_buffer

# --- 5. メインチャットループ ---
if query := st.chat_input("質問を入力してください..."):

    # ユーザー入力の表示と保存
    st.session_state.messages.append({"role": "user", "content": query})
    with st.chat_message("user"):
        st.markdown(query)

    # AI応答の処理
    with st.chat_message("assistant"):

        # 検索(ここは同期処理で待つ必要があります)
        relevant_docs = retriever.invoke(query)
        context_text = "\n\n".join([d.page_content for d in relevant_docs])

        # 履歴オブジェクトの作成
        # 直前のユーザー入力は prompt の {question} に入るため、履歴からは除外しても良いですが、
        # ここでは過去分すべてを渡す形にします(現在進行中の質問は除く)
        chat_history = get_chat_history_features(st.session_state.messages[:-1])

        # チェーンの定義
        chain = prompt | llm | StrOutputParser()

        # ストリーミング実行
        # stream() メソッドはジェネレータ(文字列の断片を次々に出すもの)を返します
        response_generator = chain.stream({
            "context": context_text,
            "chat_history": chat_history,
            "question": query
        })

        # st.write_stream でリアルタイム表示しつつ、完了後の全テキストを取得
        response_text = st.write_stream(response_generator)

        # 履歴に追加
        st.session_state.messages.append({"role": "assistant", "content": response_text})

        # (オプション)参照元の表示
        # ストリーミング完了後にソースを表示します
        with st.expander("参照元の情報を表示"):
            for doc in relevant_docs:
                st.markdown(f"- {doc.page_content[:100]}...")
                st.caption(f"Source: {doc.metadata.get('source', 'Unknown')}")

コードの重要ポイント解説

  1. MessagesPlaceholder("chat_history"):
    プロンプトテンプレートの中で、「ここに過去の会話リストをガバっと挿入してね」という指定です。これにより、LangChainがリストを展開してプロンプトに組み込んでくれます。
  2. get_chat_history_features 関数:
    st.session_state{"role": "user", "content": "..."} という辞書で保存していますが、LangChainは HumanMessageAIMessage という専用のクラスを期待します。この変換を行う関数です。
  3. chain.stream(...):
    ここが核心です。.invoke() の代わりに .stream() を使うことで、応答が一度に来るのではなく、イテレータとして返ってきます。
  4. st.write_stream(response_generator):
    受け取ったイテレータを画面に描画します。以前のStreamlitでは、テキストが更新されるたびに再描画のチラつきを気にする必要がありましたが、この関数は内部で最適化されており、非常に滑らかに表示されます。

7. まとめと次回予告

お疲れ様でした!
今回実装した「ストリーミング」と「履歴管理」によって、あなたのRAGアプリは、単なる実験コードから、「ユーザーが使いたくなるアプリケーション」へと進化しました。

  • リアルタイムに文字が表示される心地よさ。
  • 「さっきの話だけど」が通じる賢さ。

これらは、モダンなAIアプリにおける必須要件です。Streamlitなら、わずか数行の追加でこれが実現できることに驚かれたかもしれません。

さて、UI/UXは向上しましたが、肝心の「回答の精度」はどうでしょうか?
現状の検索システムは、「キーワードが似ているかどうか」だけで判断しています。しかし、ユーザーの質問が曖昧だったり、複雑だったりすると、適切なドキュメントが見つからないことがあります。

次回、第7回「RAGの精度を極める:高度なリトリーバル戦略(Multi-Query, Re-ranking)」では、検索の精度を劇的に向上させるためのプロフェッショナルな手法に踏み込みます。

  • 質問をAIに書き直させて検索漏れを防ぐ(Multi-Query)。
  • 検索結果を別のAIが再審査して順位をつけ直す(Re-ranking)。

最後まで読んでいただきありがとうございます。

連載記事リンク

コメント

タイトルとURLをコピーしました