【第9回】AI記事作成を非同期イベント駆動型へ発展させる

スポンサーリンク
この記事は約53分で読めます。

第8回では、Event Queue と Dispatcher を導入し、AI記事作成システムをイベントキュー型の構成へ発展させました。

第9回では、この仕組みをさらに進め、asyncio.Queue と Worker を使った非同期イベント駆動型の構成に変更します。

第9回AI記事作成の処理イメージ

main_async.py

asyncio.Queue を作成

process.started を Queue に投入

Workerを複数起動

WorkerがQueueからイベントを取得

AsyncDispatcherがevent/statusを判定

async_handlers.py の処理を実行

次のイベントをQueueへ投入

article.saved / process.completed で終了

疑似非同期との違い

第8回は、実際には以下のように1本の流れでした。

イベントを1件取り出す

処理する

次のイベントを1件取り出す

処理する

第9回では、Workerを用意します。

Worker 1:イベント待機
Worker 2:イベント待機
Worker 3:イベント待機

ただし、今回のAI記事作成フローは、

Planner → Researcher → Writer → Reviewer → Validator → Save

という依存関係が強いため、すべてを完全に並列化するわけではありません。

第9回での本質は、処理基盤を非同期イベント駆動型にすることです。

第9回で実装するWorkerの考え方

Workerは、Queueからイベントを受け取り、Dispatcherに渡す役割です。

これにより、イベントが発生するたびにWorkerが処理します。

第9回AI記事作成処理での目標

第9回では、以下を目標にします。

・asyncio.Queue を導入する
・Workerを起動する
・AsyncDispatcherを作る
・Handlerを async 対応する
・同期LLM処理は asyncio.to_thread で呼ぶ
・event_log.json は継続して保存する
・第8回と同じ記事作成フローを非同期基盤で動かす
非同期イベント駆動型のAI記事作成処理フロー図

第9回AI記事作成での重要な変更点

第8回AI記事作成では、EventQueue にイベントを積み、Dispatcher が1件ずつ処理する 疑似非同期 でした。

第9回AI記事作成では、これを次のようにasyncio.QueueとWorkerでマルチエージェントを並行制御する方向に変更させています。

第8回:
EventQueue
↓
while文で1件ずつ処理
↓
Dispatcher
↓
Handler

第9回:
asyncio.Queue
↓
複数Workerが待機
↓
Dispatcherがイベントを振り分け
↓
async Handlerで処理

ただし、今回のシステムでは run_planner() や run_writer()、run_reviewer() など、多くのAI処理は通常の同期関数です。
これらは asyncio.to_thread() で包み、asyncio のイベントループを止めにくい形で実行します。

asyncio.to_thread()は、同期関数を、asyncioの流れを止めない形で実行するための仕組みです。
これにより、同期関数を別スレッドで実行し、asyncio の流れを止めないようにしています。

一方、Web検索処理では、複数の検索クエリを asyncio.gather() でまとめて実行します。

各検索処理が同期的なAPI呼び出しとして実装されている場合でも、asyncio.to_thread() と組み合わせることで、非同期の流れの中で並列的に処理できます。

非同期イベント駆動型処理の構成図

第9回AI記事作成ファイル構成

第8回のファイルをいきなり壊さないため、最初は 第9回用の別ファイル として作るのが適しています。

article-agent/
├ main_async.py ← 追加:第9回用の起動ファイル
├ async_dispatcher.py ← 追加
├ async_handlers.py ← 追加
├ async_worker.py ← 追加
├ event_types.py ← 変更
├ event_bus.py ← 継続
├ runtime_state.py ← 追加
├ dispatcher.py ← 残す
├ handlers.py ← 残す
├ main.py ← 残す
└ agents/

この構成にすると、第8回版も残したまま、第9回版を安全に試すことができます。

プログラム実行時は次のように起動方法を分けます。

第8回版起動:
python main.py

第9回版起動:
python main_async.py

第9回AI記事作成 追加・変更ファイル

以下に、第9回AI記事作成で追加するファイル一式を書き出します。

今回は、第8回のファイルを壊さず第9回AI記事作成の非同期版ファイルを追加する構成にします。

追加:
・main_async.py
・async_worker.py
・async_dispatcher.py
・async_handlers.py
・runtime_state.py

変更:
・event_types.py

第8回の event_bus.py は基本的に流用します。

追加:async_worker.py

追加:async_handlers.py

追加:async_dispatcher.py

追加:runtime_state.py

変更:event_types.py

追加:main_async.py

既存ファイルの扱い

第9回では、以下は第8回版をそのまま使います。

event_bus.py
review_parser.py
validators.py
mcp_client.py
mcp_server.py
web_search_tool.py
agents/planner.py
agents/researcher.py
agents/writer.py
agents/reviewer.py

第9回AI記事作成の実行方法

第9回AI記事作成での実行は、以下のコマンドで行います。

main_async.py は複数行入力に対応しています。
キーワードを入力し、最後に空行のままEnterを押すと処理が始まります。

実行時に生成されるファイル

第9回AI記事作成の実行時に、以下のファイルが生成されます。

event_log.json
final_article.html
draft_needs_review.html
run_summary.txt
review_result_*.json
validation_*.txt

記事作成実行後は、上記ファイルにより動作確認を行います。

また、第9回AI記事作成で生成される記事の保存先は、以下のように変更されます。

review_passed: true
→ final_article.html

review_passed: false
→ draft_needs_review.html

第9回では、作成した記事の公開可否を判断しやすくするために、レビューNGの記事をdraft_needs_review.html として分けて保存しています。

第9回AI記事作成の実行結果

ターミナル出力

(UTF-8)

(SHIFT-JIS)

出力記事

(UTF-8)

(SHIFT-JIS)

event_log.json

(UTF-8)

(SHIFT-JIS)

event_log.jsonから分かる実行結果

event_log.jsonから、process.started で第9回AI記事作成の asyncio.Queue + Worker 版として起動し、worker_count: 3 により3つのWorkerを用意したことが分かります。

その後、Planner AI、Researcher AI、Writer AI、Reviewer AI、Validator、保存処理の順にイベントが記録されています。

また、5件の web_search.completed が同じ時刻に記録されているため、Web検索部分は並列的に実行されたと考えられます。

main_async.py
|
| process.started
| mode: asyncio_queue_worker / worker_count: 3
v
Planner AI
|
| plan.created
v
Researcher AI
|
| research.query_created
v
Web Search
|
| web_search.completed × 5
| ※5件が同時刻に完了
v
Researcher AI
|
| research.completed
v
Writer AI
|
| draft.created
v
Reviewer AI
|
| review.completed / status: OK
v
Validator
|
| validation.completed / status: OK
v
Save
|
| article.saved / review_passed: true
v
process.completed / status: OK

まとめ

第9回AI記事作成では、第8回のイベントキュー型構成を発展させ、asyncio.Queue と Worker を使った非同期イベント駆動型のAI記事作成システムを作成しました。

main_async.py で asyncio.Queue を用意し、複数のWorkerがイベントを待機します。
イベントが投入されると、WorkerがQueueから取り出し、AsyncDispatcherが event と status を見て、Planner、Researcher、Writer、Reviewer、Validator、保存処理へ振り分けます。

また、既存の run_writer() や run_reviewer() などの同期関数は、asyncio.to_thread() を使って別スレッドで実行する形にしました。これにより、既存コードを大きく変更せずに、非同期イベント駆動型の構成へ組み込めます。

実行結果では、event_log.json に mode: asyncio_queue_worker と worker_count: 3 が記録され、第9回版として動作していることを確認できました。Reviewer AIとValidatorの判定もOKとなり、最終記事は final_article.html として保存されました。

これにより、AI記事作成システムは、単なる順番実行から、非同期イベント駆動型の基盤へと一歩進みました。

次回以降に向けて

次回以降では、Ollama API呼び出しやWeb検索処理を async def に対応させ、httpx.AsyncClient などを使って関数そのものを非同期化していきます。
これにより、to_thread() に依存せず、より自然な非同期イベント駆動型の構成へ発展できます。