Dash応用編:第3回 データの動的更新とストリーミング

Python
この記事は約8分で読めます。

こんにちは、JS2IIUです。少し間が空いてしまいましたが、第3回の記事を投稿します。よろしくお願いします。

はじめに

第3回では、リアルタイムでデータを動的に更新・表示する方法について取り上げます。これにより、ライブデータを表示したり、ユーザーの操作に応じてデータが逐次更新されるダッシュボードを構築できるようになります。特に、株価やセンサーデータなどのストリーミングデータを扱う際に非常に役立つ手法です。

この記事では、Dashでの動的データ更新の基本的な実装方法から始め、ストリーミングデータの扱い方、WebSocketや外部APIと連携する方法までをカバーします。

Dashでの動的更新:dcc.Intervalの利用

動的データ更新を行うための基本的なコンポーネントがdcc.Intervalです。このコンポーネントは一定の間隔でイベントをトリガーし、そのイベントに応じて他のコンポーネントを更新します。

次に、シンプルな動的更新の例を見てみましょう。下記の例では、現在の時刻が1秒ごとに更新されます。ごめんなさい、小さく切り抜いたので逆に文字がでっかくなっちゃいました。

import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import datetime

app = dash.Dash(__name__)

app.layout = html.Div([
    html.H1('現在の時刻'),
    html.Div(id='live-update-text'),
    dcc.Interval(
        id='interval-component',
        interval=1*1000,  # 1秒ごとに更新(ミリ秒単位)
        n_intervals=0
    )
])

@app.callback(
    Output('live-update-text', 'children'),
    [Input('interval-component', 'n_intervals')]
)
def update_time(n):
    return f"現在の時刻: {datetime.datetime.now().strftime('%H:%M:%S')}"

if __name__ == '__main__':
    app.run_server(debug=True)

このアプリケーションでは、dcc.Intervalコンポーネントを使って1秒ごとにn_intervalsが増加し、それをトリガーとして現在の時刻を更新しています。dcc.Intervalは、特定のタイミングでコールバックを実行するための仕組みとして非常に有効です。

動的なグラフの更新

次に、動的に更新されるグラフの例を見ていきます。この例では、1秒ごとに新しいデータポイントが追加され、グラフがリアルタイムで更新されます。

import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
import random

app = dash.Dash(__name__)

# 初期データを設定
X = [0]
Y = [random.randint(0, 10)]

app.layout = html.Div([
    dcc.Graph(id='live-graph'),
    dcc.Interval(
        id='graph-update',
        interval=1*1000,  # 1秒ごとに更新
        n_intervals=0
    )
])

@app.callback(
    Output('live-graph', 'figure'),
    [Input('graph-update', 'n_intervals')]
)
def update_graph(n):
    global X, Y
    X.append(X[-1] + 1)
    Y.append(random.randint(0, 10))

    # グラフを更新
    data = go.Scatter(
        x=X,
        y=Y,
        mode='lines+markers'
    )

    return {'data': [data],
            'layout': go.Layout(xaxis=dict(range=[min(X), max(X)]),
                                yaxis=dict(range=[0, 10]))}

if __name__ == '__main__':
    app.run_server(debug=True)

この例では、毎秒ランダムな値を生成し、dcc.Graphでその値をプロットしています。グラフはリアルタイムでデータが追加され、更新される仕組みです。dcc.Intervalを使って、定期的に新しいデータポイントを取得し、グラフに反映しています。

外部APIからのストリーミングデータの取得

次に、リアルタイムデータを外部APIから取得し、Dashで表示する方法を見ていきます。ここでは、仮想的なAPIからJSONデータを取得して表示する例を取り上げます。

import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import requests

app = dash.Dash(__name__)

app.layout = html.Div([
    html.H1('リアルタイムデータ'),
    html.Div(id='api-data'),
    dcc.Interval(
        id='interval-component',
        interval=5*1000,  # 5秒ごとにAPIを呼び出し
        n_intervals=0
    )
])

@app.callback(
    Output('api-data', 'children'),
    [Input('interval-component', 'n_intervals')]
)
def update_api_data(n):
    # 仮想的なAPIからデータを取得
    response = requests.get("https://jsonplaceholder.typicode.com/todos/1")
    data = response.json()
    return f"ID: {data['id']}, タイトル: {data['title']}"

if __name__ == '__main__':
    app.run_server(debug=True)

このコードでは、仮想的なAPI (jsonplaceholder.typicode.com) から5秒ごとにデータを取得し、アプリケーションに表示しています。リアルタイムで変化するAPIデータをDashに取り込むことで、ライブダッシュボードを作成することができます。

WebSocketを使ったストリーミング

WebSocketを使用することで、サーバーとクライアント間で双方向のリアルタイム通信が可能です。DashはデフォルトではWebSocketをサポートしていないため、dash-extensionsパッケージを利用します。

pip install dash-extensions

次に、dash-extensionsを使ったWebSocketのストリーミング例です。

from dash import dcc, html
import dash
from dash_extensions import WebSocket

app = dash.Dash(__name__)

app.layout = html.Div([
    WebSocket(id="ws", url="wss://example.com/stream"),  # WebSocketの接続先
    html.Div(id="ws-data")
])

@app.callback(
    Output("ws-data", "children"),
    [Input("ws", "message")]
)
def update_ws_data(message):
    # WebSocketから受信したデータを表示
    return f"受信したデータ: {message['data']}"

if __name__ == '__main__':
    app.run_server(debug=True)

この例では、WebSocketを利用して外部のストリーミングサービスからデータを取得し、リアルタイムで更新しています。wss://example.com/streamは仮のWebSocketサーバーで、実際の使用時には適切なURLを指定する必要があります。

大量データの処理とパフォーマンスの最適化

リアルタイムで更新されるデータが増えると、処理が重くなり、パフォーマンスの問題が発生することがあります。これに対処するためには、以下のようなテクニックを活用できます。

  1. データ量の制限: 最新のデータだけを表示するようにし、古いデータは表示から外す(FIFOバッファ)。
  2. コールバックの最適化: コールバックで必要なデータだけを処理するようにする。
  3. WebSocketやAPIの最適化: サーバーとの通信量を減らし、必要なタイミングでのみデータを取得する。

まとめ

この記事では、Dashを使ったデータの動的更新とストリーミングの実装方法について紹介しました。dcc.Intervalを使った定期的なデータ更新や、外部API、WebSocketとの連携によって、リアルタイムでデータを取得し、ダッシュボードに反映させる方法を学びました。次回は「データのフィルタリングと検索機能」を取り上げ、インタラクティブなデータ操作の方法を紹介します。

参考リンク

最後まで読んでいただきありがとうございます。次回もお楽しみに!

コメント

タイトルとURLをコピーしました