【Python】非同期プログラミング入門:asyncioを使った効率的なコーディング

Python
この記事は約16分で読めます。

こんにちは、JS2IIUです。

非同期処理を実現するasyncioについて事例を紹介します。今回もよろしくお願いします。

1. はじめに

Pythonで複数の処理を同時に進めたいとき、非同期プログラミングが有効です。特にI/O操作(API通信、ファイル入出力、データベースアクセスなど)では、待ち時間を有効に使い、パフォーマンスを向上させることができます。

本記事では、Pythonの標準ライブラリであるasyncioを使った非同期プログラミングを、実践的なユースケースを交えながら解説します。

2. asyncioの基礎知識

2.1 asyncとawaitの基本

asyncawaitは非同期処理を記述するためのPythonのキーワードです。

  • async:非同期関数を定義する。
  • await:非同期処理の完了を待つ。

2.2 簡単な非同期関数の例

Python
import asyncio

async def say_hello():
    await asyncio.sleep(2)
    print("Hello, asyncio!")

asyncio.run(say_hello())

このコードでは、asyncio.sleep(2)で2秒間待機し、その後メッセージを出力します。

asyncioモジュールのインポート

Python
import asyncio
  • Python標準ライブラリに含まれる非同期処理を実現するためのモジュールです。
  • asyncioを使用することで、I/O操作(ネットワーク通信、ファイル処理など)を効率的に実行できます。

非同期関数の定義

Python
async def say_hello():
  • asyncキーワードを付けることで、非同期関数(コルーチン)を定義します。
  • 通常のdefで作成した関数は同期的に実行されますが、async defを使うと非同期実行が可能になります。

コルーチン(Coroutine)とは?

  • 一時停止と再開が可能な特殊な関数です。
  • Pythonのasync/await構文はこのコルーチンを操作します。

awaitによる非同期処理

Python
await asyncio.sleep(2)
  • awaitは非同期関数内でのみ使えるキーワードです。
  • asyncio.sleep(2)は、2秒間待機する非同期関数です。
    • time.sleep()は同期関数で、処理をブロック(停止)しますが、asyncio.sleep()は非同期的に待機します。
    • 他のタスクがある場合、この2秒の間に他の処理を実行できます。

ポイント

  • awaitを使うことで、他のタスクに処理を譲りながら待機できます。
  • これにより、CPUがアイドル状態になるのを防ぎ、リソースを有効活用できます。

メッセージの出力

Python
print("Hello, asyncio!")
  • 2秒待機した後に、コンソールにHello, asyncio!を出力します。

asyncio.run()でコルーチンを実行

Python
asyncio.run(say_hello())
  • asyncio.run()はPython 3.7以降で導入された便利な関数で、非同期コードを実行します。
  • say_hello()コルーチンをイベントループで実行し、終了するまで待機します。

イベントループとは?

  • 非同期処理を管理・実行する仕組みです。
  • asyncio.run()は内部でイベントループを生成し、コルーチンを実行します。

プログラムの実行フロー

  1. asyncio.run(say_hello())でイベントループが作成され、say_hello()が実行開始。
  2. await asyncio.sleep(2)で2秒間の非同期待機を行う(この間、他のタスクがあれば実行可能)。
  3. 2秒経過後にprint("Hello, asyncio!")が実行され、コンソールにメッセージが表示される。
  4. コルーチンが終了し、イベントループが閉じられる。

3. ユースケース1:複数のAPIからデータを取得

複数のAPIを順番に呼び出すと時間がかかりますが、asyncioを使うと同時に実行でき、時間を節約できます。

3.1 同期処理の場合

Python
import requests
import time

def fetch_url(url):
    response = requests.get(url)
    return response.text

start = time.time()

for url in ["https://example.com", "https://example.org"]:
    print(fetch_url(url))

end = time.time()
print(f"Elapsed time: {end - start:.2f} seconds")

このコードを実行すると、各URLから取得した内容と処理時間が以下のように表示されます。

Bash
<!doctype html>
<html>...</html>
<!doctype html>
<html>...</html>
Elapsed time: 4.02 seconds

3.2 非同期処理の場合

aiohttpを使用して非同期でAPIを取得します。

Python
import aiohttp
import asyncio
import time

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ["https://example.com", "https://example.org"]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)

start = time.time()
asyncio.run(main())
end = time.time()
print(f"Elapsed time: {end - start:.2f} seconds")

このコードでは、aiohttp.ClientSession()を使って複数のURLを並行して取得しています。

ライブラリのインポート

Python
import aiohttp
import asyncio
import time
  • aiohttp:非同期HTTPクライアントライブラリ。HTTPリクエスト(GET、POSTなど)を非同期に実行できます。
  • asyncio:Pythonの標準非同期フレームワーク。複数の非同期処理を管理します。
  • time:処理時間を計測するために使用します。

URLからデータを取得する関数

Python
async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()
関数の仕組み
  1. async def
  • 非同期関数を定義するために使用します。通常のdefではなくasync defを使うことで、関数をコルーチンとして扱います。
  1. session.get(url)
  • aiohttp.ClientSessionGETリクエストを非同期で送信します。
  • async withを使うことで、HTTPセッションを安全に開閉できます(リソースリーク防止)。
  1. await response.text()
  • レスポンスの本文を非同期に取得します。通常の.text()ではなくawaitを使うことで、I/O待ちを他の処理に譲れます。

💡 ポイント
この関数は1つのURLからデータを取得する処理を非同期で行います。


メイン関数

Python
async def main():
    urls = ["https://example.com", "https://example.org"]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)
各行の説明
  1. urlsリストの定義
Python
   urls = ["https://example.com", "https://example.org"]
  • 取得したいURLをリストに格納しています。
  1. async with aiohttp.ClientSession()
Python
   async with aiohttp.ClientSession() as session:
  • ClientSessionを作成し、HTTPセッションを開始します。
  • async withを使用すると、終了時に自動的にセッションが閉じられるため、明示的にsession.close()を呼ぶ必要はありません。
  1. タスクの生成
Python
   tasks = [fetch_url(session, url) for url in urls]
  • リスト内包表記を使用し、各URLに対してfetch_url()を呼び出す非同期タスクを作成。
  • タスクとは、非同期に実行される処理の単位です。
  1. 非同期実行の開始
Python
   results = await asyncio.gather(*tasks)
  • asyncio.gather()は、複数のコルーチンを並列に実行し、すべての結果をリストで返します。
  • awaitを使うことで、すべてのタスクが完了するまで待機します。
  1. 結果の出力
Python
   for result in results:
       print(result)
  • 各URLから取得したデータを順に出力します。

💡 重要なポイント

  • すべてのURLを同時にリクエストするため、リクエスト時間の最も長いURLに依存した待ち時間になります。
  • 同期処理では各URLを順番に処理するため、トータル時間が増えますが、ここでは並列で処理するため高速です。

時間計測とプログラムの実行

Python
start = time.time()
asyncio.run(main())
end = time.time()
print(f"Elapsed time: {end - start:.2f} seconds")
処理の流れ
  1. start = time.time()
  • 現在のUNIXタイムスタンプを取得し、処理開始時刻を記録します。
  1. asyncio.run(main())
  • メイン関数を非同期で実行します。Python 3.7以降で導入されたメソッドで、簡潔に非同期処理を実行できます。
  • イベントループが自動で作成・管理され、終了後にクリーンアップも行われます。
  1. end = time.time()
  • 処理終了時の時刻を取得。
  1. 処理時間を表示
Python
   print(f"Elapsed time: {end - start:.2f} seconds")
  • 経過時間を小数点以下2桁で表示。

実行結果

通常、2つのURLからデータを取得し、処理時間を表示します。

Plaintext
<!DOCTYPE html>
<html>...</html>
<!DOCTYPE html>
<html>...</html>
Elapsed time: 1.02 seconds

4. ユースケース2:大量のファイルを同時に処理

asyncio.to_thread()を使えば、CPUバウンドの処理も非同期で実行できます。

4.1 ファイルの非同期読み込み

Python
import asyncio

def read_file(file_path):
    with open(file_path, 'r') as file:
        return file.read()

async def main():
    file_paths = ["file1.txt", "file2.txt"]
    tasks = [asyncio.to_thread(read_file, path) for path in file_paths]
    results = await asyncio.gather(*tasks)
    for content in results:
        print(content)

asyncio.run(main())

read_file関数

Python
def read_file(file_path):
    with open(file_path, 'r') as file:
        return file.read()
説明
  • 引数file_path → 読み込むファイルのパス。
  • 処理内容
    1. ファイルを開くopen()関数を使用)。
    2. ファイルの内容を読み込みfile.read())。
    3. ファイル内容を返すreturn)。
ポイント
  • 同期関数(普通のdefで定義)であり、通常はブロッキング(I/O待ちでプログラムが停止する)処理を行います。

main関数

Python
async def main():
    file_paths = ["file1.txt", "file2.txt"]
    tasks = [asyncio.to_thread(read_file, path) for path in file_paths]
    results = await asyncio.gather(*tasks)
    for content in results:
        print(content)
説明
  1. ファイルパスのリスト作成
Python
   file_paths = ["file1.txt", "file2.txt"]
  • 読み込む対象のファイルをリストで定義。
  1. 非同期タスクの作成
Python
   tasks = [asyncio.to_thread(read_file, path) for path in file_paths]
  • asyncio.to_thread()を使用して、read_file関数をスレッドプールで実行。
  • I/Oバウンド(入出力待ち時間が長い)な処理を非同期化。 asyncio.to_thread()の特徴
  • 同期関数を簡単に非同期タスクに変換。
  • CPUを使用しないI/O操作(ファイル読み込み・書き込み・ネットワーク通信など)に適している。
  1. 複数の非同期タスクを並行実行
Python
   results = await asyncio.gather(*tasks)
  • asyncio.gather()を使い、複数の非同期タスクを並行実行し、すべて完了するのを待つ。
  • await非同期タスクの完了を待機し、結果をresultsに格納。
  1. ファイル内容を出力
Python
   for content in results:
       print(content)
  • 取得した各ファイルの内容を順に表示。

5. ユースケース3:チャットボットの非同期実装

複数のユーザーからの入力を非同期に処理するチャットボットを作成します。

5.1 非同期チャットボットの例

Python
import asyncio

async def chat_bot(user, delay):
    await asyncio.sleep(delay)
    print(f"{user}: Hello!")

async def main():
    users = [("User1", 3), ("User2", 1), ("User3", 2)]
    tasks = [chat_bot(user, delay) for user, delay in users]
    await asyncio.gather(*tasks)

asyncio.run(main())

このプログラムは複数のユーザー入力を並行して処理します。

chat_bot関数

Python
async def chat_bot(user, delay):
    await asyncio.sleep(delay)
    print(f"{user}: Hello!")
説明
  • 引数
  • user:ユーザー名(例: "User1")。
  • delay:メッセージを出力するまでの待機時間(秒数)。
  • 処理の流れ
  1. 非同期に待機await asyncio.sleep(delay)
    • awaitを使って指定された秒数だけ一時停止
    • 他のタスクはこの間も並行して実行されるため、処理が効率的に進む。
  2. メッセージ出力
    python print(f"{user}: Hello!")
    • 待機時間が終了すると、userに対応するメッセージを出力

main関数

Python
async def main():
    users = [("User1", 3), ("User2", 1), ("User3", 2)]
    tasks = [chat_bot(user, delay) for user, delay in users]
    await asyncio.gather(*tasks)
説明
  1. ユーザーと遅延時間のリストを作成
Python
   users = [("User1", 3), ("User2", 1), ("User3", 2)]
  • 3人のユーザーと、対応する遅延時間をリストで定義。
  1. 非同期タスクの作成
Python
   tasks = [chat_bot(user, delay) for user, delay in users]
  • リスト内包表記を使い、各ユーザーに対してchat_bot関数を呼び出す非同期タスクを生成。
  1. 複数のタスクを同時実行
Python
   await asyncio.gather(*tasks)
  • asyncio.gather()で複数の非同期タスクを並行して実行。
  • すべてのタスクが完了するまで待機。

プログラムの実行

Python
asyncio.run(main())
説明
  • asyncio.run()は、非同期プログラムを実行するためのエントリーポイント。
  • main()関数を呼び出し、イベントループを作成して非同期処理を開始。

6. エラーハンドリングとデバッグ

非同期処理でも例外処理を適切に行うことが重要です。

6.1 asyncio.TimeoutErrorの処理

Python
import asyncio

async def slow_task():
    await asyncio.sleep(5)

async def main():
    try:
        await asyncio.wait_for(slow_task(), timeout=3)
    except asyncio.TimeoutError:
        print("Timeout occurred")

asyncio.run(main())

7. ベストプラクティス

  • I/Oバウンド処理に特化asyncioはI/O操作(ネットワーク、ファイル処理など)に有効。
  • 同期と非同期を混在させない:同期コードを非同期コードに適切に変換。
  • 例外処理を忘れないasyncio.wait_for()でタイムアウト管理をする。

8. まとめと次のステップ

本記事では、Pythonのasyncioを使った非同期プログラミングの基礎と、実践的なユースケースを解説しました。

さらなる学習リソース

非同期プログラミングをマスターして、より効率的なPythonコードを目指しましょう!

最後に、書籍のPRです。
24年9月に出版された「ハイパーモダンPython-信頼性の高いワークフローを構築するモダンテクニック」、Claudio Jolowicz著、嶋田、鈴木訳。開発環境の構築、プロジェクトの管理、テストに関して実践的な内容でとても参考になる一冊です。ぜひ手に取ってみてください。

最後まで読んでいただきありがとうございます。

コメント

タイトルとURLをコピーしました