HTTP APIで使えるシンプルな推論サーバをFlaskで作る

推論サーバをFlaskで構築する Python
推論サーバをFlaskで構築する
この記事は約25分で読めます。

こんにちは、JS2IIUです。
機械学習モデルを実際のアプリケーションで利用するためには、モデルをAPI化し、外部から推論リクエストを受け取れるようにする必要があります。例えば、Webアプリやモバイルアプリから画像やテキストを送信し、その結果を返すといった仕組みです。

このような「モデルのAPI化」には、FlaskやFastAPIといった軽量なPythonフレームワークがよく使われます。本記事では、Flaskを用いて学習済みPyTorchモデルをHTTP APIとして公開する方法を、初学者にもわかりやすく解説します。

また、推論エンドポイント /predict の実装だけでなく、バッチ推論の処理方法スレッド安全性の確保、そして簡単な負荷テスト方法についても取り上げます。

本記事を読み終えるころには、自分のモデルをFlaskサーバとしてデプロイし、HTTP経由で推論を呼び出す方法を理解できるようになります。今回もよろしくお願いします。

FlaskとHTTP APIの基礎

まずはFlaskの基本から確認しましょう。FlaskはPythonで書かれた軽量なWebフレームワークで、数行のコードでHTTPサーバを立ち上げることができます。HTTP API(Application Programming Interface)は、アプリケーション間でデータをやり取りするための仕組みで、リクエストとレスポンスで構成されます。

以下のコードは、最も基本的なFlaskサーバの例です。

Python
from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/')
def hello():
    return jsonify({'message': 'Hello, Flask API!'})

if __name__ == '__main__':
    app.run(debug=True)

このスクリプトを実行すると、http://127.0.0.1:5000/ にアクセスした際に "Hello, Flask API!" というJSONレスポンスが返ります。
@app.route('/') はエンドポイントを定義するデコレータで、HTTPリクエストを受け取ったときの処理を指定します。

これを応用して、学習済みモデルにデータを渡し、予測結果を返すAPIを構築します。

Flaskに関する関連情報リンク

学習済みPyTorchモデルを読み込む

次に、推論に使うモデルをPyTorchで読み込みます。ここでは、説明を簡単にするため、事前に保存されたダミーの線形モデルを使用します。

Python
import torch
import torch.nn as nn

# シンプルな線形モデルを定義
class SimpleModel(nn.Module):
    def __init__(self):
        super(SimpleModel, self).__init__()
        self.linear = nn.Linear(3, 1)  # 入力3次元、出力1次元

    def forward(self, x):
        return self.linear(x)

# 学習済みモデルの読み込み(ここではダミー)
model = SimpleModel()
model.load_state_dict(torch.load('simple_model.pt', map_location='cpu'))
model.eval()  # 推論モードに設定

model.eval() を呼び出すことで、ドロップアウトやバッチ正規化などが推論用の動作になります。
このようにモデルをロードしたあと、Flaskサーバ内で利用できるように保持します。

ダミーのsimple_model.ptを作成する方法

テストや記事のハンズオンで手早く動作を確認したい場合は、あらかじめ簡単なダミーモデル(あるいは短時間学習したモデル)の重みを simple_model.pt として保存しておくと便利です。ここでは素早く試せる2つの方法(1. 再現性のある「固定重み」ダミー、2. 小さなデータで少しだけ学習して保存)を示します。どちらもローカル環境で数秒〜数分で作成できます。

1) 最速:固定重みのダミーモデルを作る(テスト向け)

入力形状やモデル定義が本体の Flask コードと一致していれば、推論フローや API レスポンスの確認だけを行う目的で固定重みを保存するのが最も手早いです。再現性のために重みを定数で埋めています。

save_dummy_model.py:

Python
import torch
import torch.nn as nn

class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(3, 1)

    def forward(self, x):
        return self.linear(x)

if __name__ == "__main__":
    model = SimpleModel()
    # 再現性のために重みを固定値で埋める
    with torch.no_grad():
        model.linear.weight.fill_(0.1)
        model.linear.bias.fill_(0.0)

    # state_dict を保存するのが推奨
    torch.save(model.state_dict(), "simple_model.pt")
    print("Saved simple_model.pt (state_dict)")

ターミナルで実行:

Bash
python3 save_dummy_model.py

これで同ディレクトリに simple_model.pt が作成され、記事内の Flask サンプルでそのまま読み込めます。

2) 少しだけ学習して保存する(実データに近い挙動を確認したい場合)

実際の学習済み重みで挙動を確認したい場合は、小さなダミーデータで短時間だけ学習して state_dict を保存します。モデルの保存方法は実運用と同じにしておくと移行がスムーズです。

train_and_save.py:

Python
import torch
import torch.nn as nn
import torch.optim as optim

class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(3, 1)

    def forward(self, x):
        return self.linear(x)

if __name__ == "__main__":
    # ダミーデータ(入力:3次元 -> 出力:1次元)
    X = torch.tensor([[1.0, 2.0, 3.0],
                      [0.5, 0.2, 0.1],
                      [2.0, 1.0, 0.0]], dtype=torch.float32)
    y = torch.tensor([[1.0], [0.5], [1.8]], dtype=torch.float32)

    model = SimpleModel()
    optimizer = optim.SGD(model.parameters(), lr=0.1)
    loss_fn = nn.MSELoss()

    for epoch in range(200):
        optimizer.zero_grad()
        preds = model(X)
        loss = loss_fn(preds, y)
        loss.backward()
        optimizer.step()

    torch.save(model.state_dict(), "simple_model.pt")
    print("Trained (briefly) and saved simple_model.pt")

実行:

Bash
python3 train_and_save.py

短時間学習でも、入力に対する出力の分布がより現実に近くなり、API のエンドツーエンド検証がやりやすくなります。

推奨と注意点

  • 保存形式は torch.save(model.state_dict(), PATH)(state_dict)を推奨します。可搬性とセキュリティ面で有利です。
  • GPU で保存したモデルを CPU 環境で読み込む場合は、読み込み時に必ず map_location=’cpu’ を使うか、保存前に model.to(‘cpu’) してください。
  • モデル全体を pickle で保存する torch.save(model, PATH) は手軽ですが、読み込み時にコード互換性やセキュリティ(任意コード実行リスク)に注意が必要です。
  • 保存時にメタデータを一緒に保存しておくと後々便利です(入力サイズや学習エポック、作成日など)。例:
Python
torch.save({
    'model_state': model.state_dict(),
    'input_size': 3,
    'notes': 'dummy model for demo'
}, 'simple_model_bundle.pt')

動作確認(Flask との接続テスト)

記事の Flask サンプルと同じディレクトリに simple_model.pt を置いたら、Flask アプリを起動して以下の curl で推論が返るか確認します。

Bash
curl -X POST -H "Content-Type: application/json" \
  -d '{"inputs": [[1.0, 2.0, 3.0]]}' \
  http://127.0.0.1:5000/predict

期待通り JSON が返れば成功です。

上のスクリプトは記事の読者がすぐに試せるように最小限にまとめています。実運用を想定する場合は、モデルのバージョン管理(ファイル名にバージョンを含める、メタデータで保存する)、認証、外部から供給されたモデルの検証などを追加してください。

/predict エンドポイントの実装

次に、Flaskサーバに /predict エンドポイントを追加し、クライアントから送られてきたデータをモデルに入力して予測結果を返すようにします。

Python
from flask import Flask, request, jsonify
import torch

app = Flask(__name__)

# すでにロード済みのモデルを使用
model = SimpleModel()
model.load_state_dict(torch.load('simple_model.pt', map_location='cpu'))
model.eval()

@app.route('/predict', methods=['POST'])
def predict():
    # クライアントから送られたJSONデータを取得
    data = request.get_json()

    # 入力データの検証
    if 'inputs' not in data:
        return jsonify({'error': 'Missing "inputs" key'}), 400

    # テンソルに変換
    inputs = torch.tensor(data['inputs'], dtype=torch.float32)

    # 推論を実行(勾配は不要)
    with torch.no_grad():
        outputs = model(inputs)

    # 結果をPythonのリストに変換して返す
    predictions = outputs.numpy().tolist()
    return jsonify({'predictions': predictions})

このエンドポイントは、JSON形式の入力を受け取り、推論結果をJSONで返します。
例えば、次のようなリクエストを送ると動作します。

Bash
curl -X POST -H "Content-Type: application/json" \
    -d '{"inputs": [[1.0, 2.0, 3.0], [0.5, 0.2, 0.1]]}' \
    http://127.0.0.1:5000/predict

出力は次のような形式になります。

JSON
{
  "predictions": [[2.345], [0.983]]
}

このように、Flaskはリクエストデータの処理やレスポンスの返却をシンプルに記述できます。

バッチ推論とスレッド安全性

サーバは同時に複数のリクエストを受け取る可能性があります。そのため、スレッド安全性を確保しつつ、複数データをまとめて処理する「バッチ推論」を行う設計が望ましいです。

PyTorchのモデルはスレッド安全ではない場合があるため、グローバルロックを使う方法があります。

Python
import threading

model_lock = threading.Lock()

@app.route('/predict', methods=['POST'])
def predict():
    data = request.get_json()
    inputs = torch.tensor(data['inputs'], dtype=torch.float32)

    with model_lock:  # 同時アクセスを防ぐ
        with torch.no_grad():
            outputs = model(inputs)

    return jsonify({'predictions': outputs.numpy().tolist()})

このようにロックを導入することで、同時リクエストが発生しても安全に処理できます。
Flaskを本番環境で動かす際は、Gunicorn のようなWSGIサーバを利用してマルチワーカーで動作させることが一般的です。

起動例:

Bash
gunicorn -w 4 app:app

ここで -w 4 はワーカー数を示し、同時に4つのリクエストを処理できます。

簡単な負荷テストを行う

API を構築したら、並列リクエストや高い同時接続数に対してサービスがどのように振る舞うかを確認する「負荷テスト(ロードテスト)」を行いましょう。

負荷テストは「何に負荷をかけるか」を明確にするところから始めます。代表的な対象は次の通りです

  • モデルの推論処理(CPU / GPU 使用率、推論レイテンシ)
  • Web サーバ(Flask/Gunicorn)の処理能力(ワーカー数・スレッド数、イベントループ)
  • ネットワーク(帯域やレイテンシ)
  • I/O(ファイル読み書き、モデル読み込み、外部 DB 呼び出し)
  • メモリ(同時接続が増えたときのメモリ使用量、OOM の有無)

負荷をかけると何が起きるか(典型的な挙動)

  • レイテンシの増加(平均応答時間が伸びる)
  • スループットの頭打ち(秒間リクエスト数が一定値で張り付く)
  • タイムアウトや 5xx エラーの増加(処理が追いつかず失敗が出る)
  • キューの蓄積(リクエストが待たされて遅延が累積する)
  • ワーカー / スレッドの枯渇やメモリ不足(最悪プロセス落ち)

これらの現象を観測するため、負荷テストでは下記の指標を取得して比べます。

  • レイテンシ分布(平均、p50, p95, p99)
  • スループット(RPS: requests per second)
  • エラー率(HTTP 5xx/4xx)
  • CPU / GPU 利用率、メモリ使用量
  • 接続状況(同時接続数、ワーカの状態)

スレッドとスレッドプールの概念は以下の通りです。

  • スレッド: OS/ランタイム上で並列に実行される「軽量な実行単位」です。Python の場合、CPython ではグローバルインタプリタロック(GIL)の影響で純粋な CPU バウンド処理の並列化には制限がありますが、I/O バウンド処理(ネットワークやディスクアクセス)ではスレッドが有効です。
  • スレッドプール: 何度もスレッドを作成・破棄するコストを避けるために、予め決められた数のスレッドを用意してタスクを割り当てる仕組みです。Python 標準の concurrent.futures.ThreadPoolExecutor はその一例で、簡単に並列リクエスト送信などを実装できます。

注: サーバ側(Flask)を本番で動かす場合、Gunicorn のような WSGI サーバで「ワーカー(プロセス)」と「各ワーカーのスレッド数」を組み合わせてチューニングするのが一般的です。CPU バウンドな推論処理はプロセス単位で分散する方が効果的なケースがあります。

簡単な負荷テストの実例(Python スクリプト)

以下は記事の範囲で気軽に試せるスクリプトです。ローカルの Flask サーバ(記事の /predict)に対して並列にリクエストを投げ、結果を集めます。

Python
import requests
import concurrent.futures
import time

def send_request(i):
    url = "http://127.0.0.1:5000/predict"
    payload = {"inputs": [[1.0, 2.0, 3.0]]}
    start = time.time()
    try:
        r = requests.post(url, json=payload, timeout=5)
        elapsed = time.time() - start
        return {'idx': i, 'status': r.status_code, 'elapsed': elapsed, 'body': r.json() if r.ok else None}
    except Exception as e:
        return {'idx': i, 'status': 'error', 'error': str(e)}

if __name__ == '__main__':
    # 同時に送るスレッド数(ここは増やして試す)
    CONCURRENCY = 10
    with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as ex:
        futures = [ex.submit(send_request, i) for i in range(CONCURRENCY)]
        results = [f.result() for f in concurrent.futures.as_completed(futures)]

    # 結果の簡易集計
    successes = [r for r in results if r.get('status') == 200]
    errors = [r for r in results if r.get('status') != 200]
    latencies = [r['elapsed'] for r in successes]
    print(f"total: {len(results)}, success: {len(successes)}, errors: {len(errors)}")
    if latencies:
        print(f"avg latency: {sum(latencies)/len(latencies):.3f}s, p95: {sorted(latencies)[int(len(latencies)*0.95)-1]:.3f}s")
    print(errors)

使い方のヒント

  • 最初は小さな並列数(例: 5, 10)から始め、徐々に増やしていきます。並列数を増やしたときにレイテンシやエラーが急増するポイントが「ボトルネックのしきい値」です。
  • サーバのログ、top / htopnvidia-smi(GPU 使用時)などで CPU/GPU/メモリの状況を同時に監視してください。
  • より本格的に測る場合は wrk, hey, ab(ApacheBench)など専用ツールを使うと詳細な RPS/レイテンシ分布が取れます。

どう解釈するか?

  • レイテンシが安定している=現在の構成で十分。ただしピーク時の余裕は別に評価する必要あり。
  • p95/p99 が大きく伸びる=一部リクエストがキューで待たされている(スローハンドラや I/O 待ちの存在)。
  • CPU/GPU が 100% 近くで張り付く=計算負荷がボトルネック。ワーカー増加やバッチ化、より速いハードウェア検討。
  • メモリ・OOM 発生=メモリ使用量を削る、バッチサイズや同時接続数を制限する、プロセス再起動対策を検討。

最後に、本番運用では負荷試験は単発で終わらせず、デプロイ前の回帰テストや負荷変動シナリオ(長時間テスト、スパイクテスト)も組み合わせて実施することをおすすめします。

まとめ

本記事では、Flaskを使って学習済みPyTorchモデルをHTTP APIとして提供する方法を解説しました。
主なポイントを振り返ります。

  1. Flaskは軽量でシンプルなWebフレームワークであり、少ないコードでAPIを構築できる。
  2. PyTorchモデルを model.eval() で推論モードにし、/predict エンドポイントでHTTP経由の入力を処理できる。
  3. 同時アクセスに備えて threading.Lock を使いスレッド安全性を確保する。
  4. 本番運用にはGunicornなどのWSGIサーバを使うことで安定したスケーラビリティを実現できる。
  5. 簡単な負荷テストを行うことで、サーバの性能を検証できる。

このように、Flaskを使えばPythonで手軽に推論サーバを構築でき、AIモデルを外部アプリケーションから利用できるようになります。
次のステップとして、FastAPIによる非同期推論やDockerでのデプロイにも挑戦してみるとよいでしょう。

最後まで読んでいただきありがとうございました。

コメント

タイトルとURLをコピーしました