こんにちは、JS2IIUです。
Streamlitで複数のユーザーが同時に利用する環境では、スケーラビリティやパフォーマンスの問題に直面することがあります。この記事では、スケーラビリティとパフォーマンスの課題をクリアし、Streamlitアプリをより効果的に運用するためのベストプラクティスについてみていきます。今回もよろしくお願いします。
1. はじめに
Streamlitは、Pythonで書かれたアプリケーションをインタラクティブなウェブアプリとして簡単にデプロイできるツールです。データサイエンスやマシンラーニングの分野で特に重宝されており、複雑なビジュアライゼーションを手軽に実装可能です。この記事では、Streamlitを利用する際のスケーラビリティやパフォーマンスの課題に重点を置き、これらの課題を克服する方法について詳しく解説します。
2. Streamlitの基本的な特徴と利点
Streamlitは、リアルタイムに結果を表示しながらアプリを作れる点をはじめとする多くの特長があります。Streamlitが提供する直感的なAPIは、短いコードで素早くアプリケーションを開発することを可能にし、主な利点として以下が挙げられます。
- シンプルなデプロイ: AWSやHerokuなどを利用して簡単に公開可能。
- 直感的なUI: わずかなコードで複雑なインターフェースが構築できる。
Streamlitの利点
- シンプルな導入プロセス
- リアルタイムなインタラクション
- オープンソースで無料
3. 同時利用に伴うスケーラビリティとパフォーマンスの問題
Streamlitは、複数のユーザーが同時にアクセスすることを考慮して設計されていますが、規模が大きくなるとパフォーマンスの問題が生じることがあります。この際にはCPUやメモリのリソースが不足し、レスポンスが遅くなることがあります。特に以下の点に注意が必要です。
- セッションあたりのメモリ使用
- CPU負荷
4. ユーザーごとのセッション管理とその重要性
Streamlitでは、各ユーザーセッションを独立して管理することが重要です。セッション管理を正しく行うことで、アプリケーションの安定性を保つことができます。ここで役立つのが、st.session_stateです。
st.session_stateの活用法
import streamlit as st
# カウンタをセッションごとに保存する
if 'counter' not in st.session_state:
st.session_state.counter = 0
def increment_counter():
st.session_state.counter += 1
# ボタンを押すたびにカウンタを増加
st.button('Increase', on_click=increment_counter)
st.write(f"Counter: {st.session_state.counter}")コード解説
基礎的な部分も含めて説明します。
import streamlit as st- Streamlitライブラリをインポートします。これによりWebアプリを簡単に作成できます。
if 'counter' not in st.session_state:
st.session_state.counter = 0st.session_stateは、ユーザーごとに状態(変数)を保持できる仕組みです。- 初回実行時に
'counter'というキーが存在しない場合、初期値として0を代入します。
def increment_counter():
st.session_state.counter += 1- カウンターの値を 1 増やす関数です。
- ボタンが押されたときに実行されます。
st.button('Increase', on_click=increment_counter)- 「Increase」ボタンを表示し、クリックされたら
increment_counter関数が呼ばれます。
st.write(f"Counter: {st.session_state.counter}")- 現在のカウンターの値を表示します。
結果として
ボタンをクリックするたびに「Counter: n」の n が増えていきます。
セッションごとに値が保存されるので、リロードしても保持されます(同じユーザー・ブラウザであれば)。
5. データ競合の基本概念と発生原因
データ競合は、複数のリクエストが同じデータに同時にアクセスする際に発生します。これにより、データの一貫性が失われ、最悪の場合データ破壊を招くことがあります。データ競合は主に以下のような状況で発生します。
- 複数のユーザーが同時に同じデータを更新
- 長時間のデータプロセッシング
6. 競合状態を防ぐためのベストプラクティス
競合状態を防ぐための一般的な方法として、ロック機構やトランザクション管理があります。これにより、1つのリソースに同時にアクセスしようとする競合を防ぎます。
ロック機構の例
import threading
lock = threading.Lock()
def update_shared_resource():
with lock:
# 安全な更新処理
passコード解説
このコードは、Python の threading モジュールを使ってスレッド間での競合を防ぐためのロック処理を実装しています。
import threading- Python の標準ライブラリ
threadingをインポートします。マルチスレッド処理をするためのモジュールです。
lock = threading.Lock()- 排他ロック(Lock)を作成します。
- 他のスレッドがこのリソースを同時に使えないように制御するためのものです。
def update_shared_resource():
with lock:
# 安全な更新処理
passwith lock:によってロックを取得します。- 他のスレッドがこのロックを使っている場合は、解放されるまで待機します。
# 安全な更新処理の部分には、共有リソースの変更処理などを安全に書けます。withを使うことで、処理が終われば自動的にロックが解放されます(lock.release()の代わり)。
7. コード例、デバッグ方法、パフォーマンス最適化のヒント
Streamlitを使ったアプリケーションはシンプルに作れる一方で、パフォーマンスが劣化する原因は複数存在します。本章では、実践的なコード例を交えながら、ボトルネックの発見とパフォーマンス最適化の手法、そしてデバッグの進め方について解説します。
7.1 不要な再実行の抑制:st.cache / st.cache_data の活用
Streamlitはインタラクションがあるたびにスクリプト全体を再実行します。そのため、重たい計算処理や外部リソースへのアクセスはキャッシュ化することで高速化できます。
st.cache_data を使ったデータ読み込みの例
import streamlit as st
import pandas as pd
@st.cache_data
def load_data():
# 時間がかかる処理(例:CSV読み込み)
df = pd.read_csv('large_dataset.csv')
return df
df = load_data()
st.write(df.head())@st.cache_dataを使うと、関数の出力をキャッシュとして保持します。- 同じ関数引数であれば再実行を避けて高速にレスポンスできます。
注意:データが頻繁に変化するケースではキャッシュの使用を慎重に。
7.2 再実行の回避に st.experimental_memo(旧 st.cache)も活用可能
より一時的なキャッシュを使いたい場合や、ロジックによるキャッシュの更新制御が必要なときは、st.experimental_memo も検討してください。
7.3 重い処理の並列化・バックグラウンド処理
外部APIへのアクセスや機械学習の推論など、処理時間がかかる部分をスレッドで非同期化することでUIの応答性を保つことができます。
スレッドでバックグラウンド実行する例
import streamlit as st
import threading
import time
def long_task():
time.sleep(5)
st.session_state["result"] = "処理完了!"
if st.button("長い処理を実行"):
st.session_state["result"] = "実行中..."
threading.Thread(target=long_task).start()
st.write(st.session_state.get("result", ""))- スレッドを使って非同期的に処理を実行。
- UI側は即座にレスポンスを返すことで、ユーザーの体感速度を向上できます。
7.4 デバッグとプロファイリングの手法
Streamlitは通常のPythonアプリケーションと同じく、ログ出力やプロファイリングを使ってボトルネックを特定できます。
ログを活用する(標準の logging モジュール)
import logging
import streamlit as st
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info("データ読み込み開始")
# 実際の処理
st.write("Hello Streamlit")- ターミナルやクラウドログでアプリの実行状況を確認可能です。
処理時間の計測に time モジュールを使う
import time
start = time.time()
# 重い処理
time.sleep(2)
end = time.time()
st.write(f"処理時間: {end - start:.2f} 秒")7.5 パフォーマンス最適化の具体的ヒント
| 項目 | 最適化方法 |
|---|---|
| データ読み込み | st.cache_data() や pandas.read_parquet() で高速化 |
| イベント処理 | on_click や st.session_state を使って再実行を局所化 |
| グラフ描画 | 大量データは要約 or ダウンサンプリングして描画 |
| 非同期処理 | threading を使って処理をバックグラウンドで実行 |
| 画像処理 | PILなどを用いた事前のリサイズで処理負荷軽減 |
| レイアウト | 不要な st.container() や st.columns() の多用は避ける |
7.6 サンプル:重い画像処理をキャッシュ+非同期で最適化
from PIL import Image, ImageFilter
import streamlit as st
import threading
@st.cache_data
def load_image():
return Image.open("sample.jpg")
def process_image():
image = load_image()
blurred = image.filter(ImageFilter.GaussianBlur(5))
st.session_state["processed"] = blurred
if st.button("画像処理開始"):
threading.Thread(target=process_image).start()
st.session_state["processed"] = None
if "processed" in st.session_state and st.session_state["processed"]:
st.image(st.session_state["processed"], caption="処理後の画像")- 画像読み込みはキャッシュし、処理部分は非同期化。
- ユーザーの操作性を保ちつつ重たい処理の完了を待たずにページが応答します。
パフォーマンス最適化のヒント
Streamlitは再実行ベースの仕組みゆえに、不要な処理やリソースの重複利用がボトルネックになりがちです。そこで、
st.cache_dataで再実行を抑制- スレッドによるバックグラウンド処理
- ログや計測によるデバッグ
- 大規模データの要約・圧縮
などの工夫を取り入れることで、高レスポンスかつスケーラブルなアプリケーションを構築できます。特に業務利用や社内ツールとして運用する場合は、チューニングを意識した設計が重要です。
8. まとめ
この記事では、ストリームリットのスケーラビリティ問題とパフォーマンス最適化の方法について学びました。ユーザーごとのセッション管理とデータ競合を理解し、適切な手法を採用することで、よりスムーズで効率的なアプリケーション運用が可能です。今後の開発において、これらの知識を活用し、より高性能なストリームリットアプリを構築してみてください。
参考
最後まで読んでいただきありがとうございます。
ご意見ご感想はコメント欄へお願いします。

