StreamlitとRAGで作る 実用的なAIチャットボット(8) LangChain Expression Language (LCEL)によるパイプラインのモジュール化

Streamlit
この記事は約26分で読めます。

こんにちはJS2IIUです。

「StreamlitとRAGで作る:実用的なAIチャットボット開発ガイド」の連載も、いよいよ終盤の第8回に突入しました。

前回は、Multi-QueryやRe-rankingといった高度な検索戦略を導入し、RAGの精度を劇的に向上させました。しかし、機能を追加するにつれて、コードの中に「接続処理」や「データ変換処理」が散らばり、少し読みづらくなってきてはいませんか?

「まずは検索して、その結果をリストから文字列に変換して、それをプロンプトの変数に入れて、LLMに投げて、結果を文字列だけ取り出す…」

このような一連の処理を、Pythonの関数呼び出しだけで記述していると、処理の流れ(パイプライン)が見えにくくなります。

そこで今回は、LangChainの真骨頂である LCEL (LangChain Expression Language) を解説します。これを使うと、複雑なRAGパイプラインを「宣言的」かつ「直感的」に記述できるようになり、コードの可読性と保守性が飛躍的に向上します。

コードを見やすく整理し、プロフェッショナルな実装スタイルを身につけましょう。今回もよろしくお願いします。

1. はじめに:なぜLCELが必要なのか?

これまでのLangChain(v0.0系)では、LLMChainRetrievalQA といった専用のクラスを使ってチェーンを構築していました。しかし、これらはブラックボックスになりがちで、カスタマイズしようとすると途端に複雑なコードを書く必要がありました。

LCELは、これを解消するために導入された新しい記述法です。UnixやLinuxのコマンドラインを使ったことがある方なら、パイプ演算子 | をご存知でしょう。

Bash
cat file.txt | grep "AI" | sort

「ファイルを開き、”AI”を含む行を抽出し、並べ替える」。データが左から右へと流れていく様子が直感的にわかります。

LCELもこれと同じ思想です。
「プロンプト | LLM | 出力パーサー」 のように、コンポーネントをパイプで繋ぐだけで処理フローを定義できます。

しかも、LCELで書かれたチェーンは自動的に以下の機能を獲得します。

  • ストリーミング対応: .stream() がすぐに使えます。
  • 非同期対応: .ainvoke() などの非同期メソッドが自動実装されます。
  • 並列処理: 複数の処理を自動的に並列化してくれます。

2. LCEL基本構文を理解する

まずは、最もシンプルな構成でLCELの書き方を見てみましょう。「ジョーク生成機」を作ります。

Python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 1. コンポーネントの準備
model = ChatOpenAI(model_name="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("{topic}に関するジョークを1つ言ってください。")
output_parser = StrOutputParser() # AIのメッセージオブジェクトから文字列部分だけを取り出す

# 2. チェーンの定義 (LCEL)
# ここが重要! パイプ `|` で繋ぐだけです
chain = prompt | model | output_parser

# 3. 実行
result = chain.invoke({"topic": "AIエンジニア"})
print(result)

非常にすっきりしていますね。
prompt の出力が model の入力になり、model の出力が output_parser の入力になります。この一連の流れが chain という一つのオブジェクト(Runnable)になります。

3. RAGパイプラインをLCELで書き直す

では、本題のRAGパイプラインをLCELで再構築してみましょう。
RAGには「検索(Retrieval)」というステップが含まれるため、単純な一直線のパイプラインにはなりません。「入力を分岐させて、片方で検索し、もう片方はそのままパスする」という処理が必要です。

ここで登場するのが RunnablePassthroughRunnableParallel です。

データの流れを整理する

RAGのプロンプトは、context(検索結果)と question(質問)の2つの入力を待ち受けています。

Python
template = """以下のコンテキストに基づいて質問に答えてください:
{context}

質問: {question}"""

これに対して、チェーンの入力はユーザーの「質問文(文字列)」ひとつだけです。
つまり、チェーンの中で以下のようなデータ変換を行う必要があります。

  1. 入力: “Streamlitとは?”
  2. 並列処理:
    • context: 入力を使ってRetrieverを検索 → ドキュメントリストを取得 → 文字列に結合
    • question: 入力をそのまま通過させる
  3. プロンプト: 上記2つを受け取ってメッセージを作成
  4. LLM: 生成
  5. 出力: 文字列化

実装コード

Python
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS

# --- 準備(これまでの回と同じ) ---
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True)
retriever = vectorstore.as_retriever()
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

template = """以下のコンテキストに基づいて質問に答えてください:
{context}

質問: {question}"""
prompt = ChatPromptTemplate.from_template(template)

# --- 補助関数 ---
def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

# --- LCELによるRAGチェーン ---
rag_chain = (
    # 1. データの準備フェーズ (RunnableParallelと等価な辞書記法)
    {
        # contextは、入力をretrieverに渡し、結果をformat_docsに通す
        "context": retriever | format_docs,
        # questionは、入力をそのまま通す
        "question": RunnablePassthrough()
    }
    # 2. プロンプトへ流し込む
    | prompt
    # 3. LLMへ
    | llm
    # 4. 文字列として出力
    | StrOutputParser()
)

# 実行
response = rag_chain.invoke("Streamlitの特徴は?")
print(response)

出力:

Python
Streamlitの特徴は、Pythonだけでフロントエンドを構築できることです。また、ステート管理には `st.session_state` を使用します。

解説:
最初の辞書 { "context": ..., "question": ... } がポイントです。LCELでは辞書を使うと自動的に並列処理(RunnableParallel)として扱われます。
retriever は入力された質問を使って検索を行い、RunnablePassthrough() は入力された質問をそのままコピーして下流(プロンプトの {question})に渡します。

4. 履歴(History)付きRAGへの拡張

第6回で実装した「会話履歴付きRAG」も、LCELを使えば美しく記述できます。
履歴は外部(Streamlitのsession_state)から注入する必要があるため、RunnablePassthrough.assign を使うと便利です。

Python
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS

# --- 準備(これまでの回と同じ) ---
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True)
retriever = vectorstore.as_retriever()
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

# 履歴フォーマット関数
def format_history(messages):
    formatted = []
    for msg in messages:
        if msg["role"] == "user":
            formatted.append(HumanMessage(content=msg["content"]))
        else:
            formatted.append(AIMessage(content=msg["content"]))
    return formatted

# 新しいプロンプト(履歴入り)
history_prompt = ChatPromptTemplate.from_messages([
    ("system", "あなたはコンテキストに基づいて回答するAIです。\n\n{context}"),
    ("placeholder", "{chat_history}"),
    ("human", "{question}"),
])

# --- 補助関数 ---
def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

# チェーンの定義
rag_with_history_chain = (
    RunnablePassthrough.assign(
        # 入力辞書の中から "chat_history" キーの値を取り出し、フォーマットする
        chat_history=lambda x: format_history(x["chat_history"]),
        # 入力辞書の中から "question" キーの値を取り出し、Retrieverで検索する
        context=lambda x: x["question"] | retriever | format_docs
    )
    | history_prompt
    | llm
    | StrOutputParser()
)

# 実行時は辞書で渡す
# rag_with_history_chain.invoke({
#     "question": "それはいつ公開されましたか?",
#     "chat_history": [{"role": "user", "content": "Streamlitについて教えて"}, ...]
# })

RunnablePassthrough.assign は、入力された辞書データに、新しいキーと値(ここでは検索結果や整形された履歴)を追加(assign)して下流に流す機能です。これにより、データの流れが非常に明確になります。

5. フォールバック(Fallback)による堅牢性の向上

本番環境では、LLMのAPIが一時的にダウンしたり、特定のモデルでエラーが出たりすることがあります。
LCELには .with_fallbacks() という強力なメソッドがあり、「メインのチェーンが失敗したら、予備のチェーンを実行する」というロジックを簡単に書けます。

Python
# メインモデル(例: 最新だが不安定なモデルや、コンテキスト長制限が厳しいモデル)
primary_llm = ChatOpenAI(model_name="gpt-4-turbo")

# バックアップモデル(例: 安定している旧モデル)
backup_llm = ChatOpenAI(model_name="gpt-3.5-turbo")

# チェーンの一部だけを切り替えられるように定義
primary_chain = prompt | primary_llm | StrOutputParser()
backup_chain = prompt | backup_llm | StrOutputParser()

# フォールバック付きチェーン
final_chain = primary_chain.with_fallbacks([backup_chain])

# これを実行すると、primaryでエラーが起きれば自動的にbackupが動く
# final_chain.invoke(...)

この機能は、APIエラーだけでなく、トークン長オーバーのエラー対策(長いコンテキストに対応できるモデルへ切り替えるなど)にも有効です。

6. Streamlitアプリへの統合

それでは、LCELでモジュール化したパイプラインを app.py に組み込みましょう。
コードの見通しが良くなるため、メインのロジックがシンプルになります。

Python
import streamlit as st
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from dotenv import load_dotenv

load_dotenv()

st.set_page_config(page_title="RAG App with LCEL")
st.title("RAG with LCEL")

# --- 1. リソース準備 ---
@st.cache_resource
def init_components():
    embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
    try:
        vectorstore = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True)
        retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
    except:
        st.error("Index not found.")
        return None, None

    llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)
    return retriever, llm

retriever, llm = init_components()

if not retriever:
    st.stop()

# --- 2. LCELチェーンの定義 ---
template = """以下のコンテキストに基づいて質問に答えてください:
{context}

質問: {question}"""
prompt = ChatPromptTemplate.from_template(template)

def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

# チェーン構築
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

# --- 3. チャットUI ---
if "messages" not in st.session_state:
    st.session_state.messages = []

for msg in st.session_state.messages:
    with st.chat_message(msg["role"]):
        st.markdown(msg["content"])

if query := st.chat_input("質問を入力..."):
    st.session_state.messages.append({"role": "user", "content": query})
    with st.chat_message("user"):
        st.markdown(query)

    with st.chat_message("assistant"):
        # LCELのおかげで、streamもそのまま呼べる!
        response_stream = rag_chain.stream(query)
        full_response = st.write_stream(response_stream)

    st.session_state.messages.append({"role": "assistant", "content": full_response})

7. まとめと次回予告

今回は、LangChain Expression Language (LCEL) を使って、RAGパイプラインをモダンで読みやすい形にリファクタリングしました。

  • 宣言的記述: | を使ってデータの流れを視覚的に表現できる。
  • Runnable: 全てのコンポーネントが共通のインターフェースを持つため、組み合わせが容易。
  • 並列処理: 辞書定義などで自動的にパラレル実行される。
  • 堅牢性: with_fallbacks でエラーハンドリングも簡単。

これでコードの保守性は高まりましたが、実運用を考えるとまだ解決すべき課題があります。
例えば、「ユーザーがアップロードした数百ページのPDFを解析したい」となった場合、Streamlitのメインプロセスでそれを実行すると、処理が終わるまでブラウザが固まってしまいます。

次回、第9回「プロダクション対応:長時間処理と非同期処理の設計」では、このような重い処理をバックグラウンドで実行し、ユーザーを待たせないための非同期処理設計について解説します。Webアプリとしての一歩進んだ設計論になります。

最後まで読んでいただきありがとうございます。

連載記事リンク

コメント

タイトルとURLをコピーしました