第8回では、Event Queue と Dispatcher を導入し、AI記事作成システムをイベントキュー型の構成へ発展させました。
第9回では、この仕組みをさらに進め、asyncio.Queue と Worker を使った非同期イベント駆動型の構成に変更します。
第9回AI記事作成の処理イメージ
main_async.py
↓
asyncio.Queue を作成
↓
process.started を Queue に投入
↓
Workerを複数起動
↓
WorkerがQueueからイベントを取得
↓
AsyncDispatcherがevent/statusを判定
↓
async_handlers.py の処理を実行
↓
次のイベントをQueueへ投入
↓
article.saved / process.completed で終了
疑似非同期との違い
第8回は、実際には以下のように1本の流れでした。
イベントを1件取り出す
↓
処理する
↓
次のイベントを1件取り出す
↓
処理する第9回では、Workerを用意します。
Worker 1:イベント待機
Worker 2:イベント待機
Worker 3:イベント待機ただし、今回のAI記事作成フローは、
Planner → Researcher → Writer → Reviewer → Validator → Saveという依存関係が強いため、すべてを完全に並列化するわけではありません。
第9回での本質は、処理基盤を非同期イベント駆動型にすることです。
第9回で実装するWorkerの考え方
Workerは、Queueからイベントを受け取り、Dispatcherに渡す役割です。
while not stop_event.is_set():
event_record = await queue.get()
await dispatcher.dispatch(event_record)
queue.task_done()これにより、イベントが発生するたびにWorkerが処理します。
第9回AI記事作成処理での目標
第9回では、以下を目標にします。
・asyncio.Queue を導入する
・Workerを起動する
・AsyncDispatcherを作る
・Handlerを async 対応する
・同期LLM処理は asyncio.to_thread で呼ぶ
・event_log.json は継続して保存する
・第8回と同じ記事作成フローを非同期基盤で動かす
第9回AI記事作成での重要な変更点
第8回AI記事作成では、EventQueue にイベントを積み、Dispatcher が1件ずつ処理する 疑似非同期 でした。
第9回AI記事作成では、これを次のようにasyncio.QueueとWorkerでマルチエージェントを並行制御する方向に変更させています。
第8回:
EventQueue
↓
while文で1件ずつ処理
↓
Dispatcher
↓
Handler
第9回:
asyncio.Queue
↓
複数Workerが待機
↓
Dispatcherがイベントを振り分け
↓
async Handlerで処理ただし、今回のシステムでは run_planner() や run_writer()、run_reviewer() など、多くのAI処理は通常の同期関数です。
これらは asyncio.to_thread() で包み、asyncio のイベントループを止めにくい形で実行します。
await asyncio.to_thread(run_writer, ...)asyncio.to_thread()は、同期関数を、asyncioの流れを止めない形で実行するための仕組みです。
これにより、同期関数を別スレッドで実行し、asyncio の流れを止めないようにしています。
一方、Web検索処理では、複数の検索クエリを asyncio.gather() でまとめて実行します。
各検索処理が同期的なAPI呼び出しとして実装されている場合でも、asyncio.to_thread() と組み合わせることで、非同期の流れの中で並列的に処理できます。

第9回AI記事作成ファイル構成
第8回のファイルをいきなり壊さないため、最初は 第9回用の別ファイル として作るのが適しています。
article-agent/
├ main_async.py ← 追加:第9回用の起動ファイル
├ async_dispatcher.py ← 追加
├ async_handlers.py ← 追加
├ async_worker.py ← 追加
├ event_types.py ← 変更
├ event_bus.py ← 継続
├ runtime_state.py ← 追加
├ dispatcher.py ← 残す
├ handlers.py ← 残す
├ main.py ← 残す
└ agents/
この構成にすると、第8回版も残したまま、第9回版を安全に試すことができます。
プログラム実行時は次のように起動方法を分けます。
第8回版起動:
python main.py
第9回版起動:
python main_async.py
第9回AI記事作成 追加・変更ファイル
以下に、第9回AI記事作成で追加するファイル一式を書き出します。
今回は、第8回のファイルを壊さず第9回AI記事作成用の非同期版ファイルを追加する構成にします。
追加:
・main_async.py
・async_worker.py
・async_dispatcher.py
・async_handlers.py
・runtime_state.py
変更:
・event_types.py第8回の event_bus.py は基本的に流用します。
追加:async_worker.py
import asyncio
import event_types as events
async def worker_loop(
*,
name: str,
event_queue: asyncio.Queue,
dispatcher,
event_bus,
run_dir: str,
save_event_log,
log_lock: asyncio.Lock,
) -> None:
"""
第9回:非同期Worker。
asyncio.Queue からイベントを受け取り、
AsyncDispatcher に渡して処理する。
None が投入された場合は停止シグナルとして扱う。
"""
while True:
event_record = await event_queue.get()
try:
if event_record is None:
return
if dispatcher.finished:
continue
await dispatcher.dispatch(event_record)
except Exception as error:
event_bus.emit(
events.PROCESS_FAILED,
status="ERROR",
payload={
"worker": name,
"message": str(error),
},
)
dispatcher.finished = True
finally:
event_queue.task_done()
async with log_lock:
await asyncio.to_thread(
save_event_log,
run_dir,
event_bus,
)追加:async_handlers.py
import asyncio
from agents.planner import run_planner
from agents.researcher import create_research_queries, run_researcher
from agents.writer import run_writer
from agents.reviewer import run_reviewer
from mcp_client import save_text_file, save_article, save_run_summary, web_search
from validators import validate_html_fragment, sanitize_html_fragment
from web_search_tool import web_search_results_to_text, web_search_results_to_json
from review_parser import (
parse_review_result,
review_result_to_text,
build_review_json_feedback,
build_unknown_review_feedback,
)
import event_types as events
MAX_SEARCH_RESULTS_PER_QUERY = 3
async def emit_and_queue(
*,
event_bus,
event_queue: asyncio.Queue,
event: str,
status: str = "OK",
payload: dict | None = None,
) -> dict:
"""
イベントをEventBusに記録し、asyncio.Queueへ投入する。
"""
event_record = event_bus.emit(
event,
status=status,
payload=payload or {},
)
await event_queue.put(event_record)
return event_record
def build_validation_text(issues: list[str]) -> str:
"""
Python側HTMLチェック結果を保存用テキストに変換する。
"""
if not issues:
return "Python側HTMLチェック:OK\n"
lines = [
"Python側HTMLチェック:修正必要",
"",
]
for issue in issues:
lines.append(f"- {issue}")
return "\n".join(lines)
def build_validation_review(issues: list[str]) -> str:
"""
Python側チェックで見つかった問題を、
Writer AIに渡せるレビュー文に変換する。
"""
issue_text = "\n".join(f"- {issue}" for issue in issues)
return f"""
Python側のHTML形式チェックで、以下の問題が見つかりました。
{issue_text}
Writer AIへの修正指示:
・上記の問題をすべて修正してください。
・記事本文の内容は削除せず、維持してください。
・プレースホルダーに置き換えないでください。
・HTML断片として出力してください。
・styleタグ、style属性、CSS、JavaScriptは使わないでください。
・Markdown記法は使わず、HTMLタグに変換してください。
・必ず修正後の記事全文を出力してください。
status:
VALIDATION_FAILED
"""
async def handle_planner(state, event_bus, event_queue: asyncio.Queue) -> None:
"""
Planner AIを非同期Workerから実行する。
実体は同期関数なので asyncio.to_thread で包む。
"""
print("\n[1] Planner AI が記事設計を作成中...")
state.plan = await asyncio.to_thread(
run_planner,
state.keyword,
)
plan_path = f"{state.run_dir}/plan.txt"
await asyncio.to_thread(
save_text_file,
plan_path,
state.plan,
)
await emit_and_queue(
event_bus=event_bus,
event_queue=event_queue,
event=events.PLAN_CREATED,
status="OK",
payload={
"path": plan_path,
},
)
async def handle_researcher(state, event_bus, event_queue: asyncio.Queue) -> None:
"""
Researcher AIで検索クエリ作成、Web検索、調査メモ作成まで行う。
Web検索部分は asyncio.gather で並列実行する。
"""
print("\n[2] Researcher AI が検索クエリを作成中...")
state.research_queries = await asyncio.to_thread(
create_research_queries,
state.keyword,
state.plan,
)
state.research_query_text = "\n".join(
f"- {query}" for query in state.research_queries
)
research_query_path = f"{state.run_dir}/research_query.txt"
await asyncio.to_thread(
save_text_file,
research_query_path,
state.research_query_text,
)
event_bus.emit(
events.RESEARCH_QUERY_CREATED,
status="OK",
payload={
"path": research_query_path,
"queries": state.research_queries,
},
)
print("\n--- Researcher AI の検索クエリ ---")
print(state.research_query_text)
print("--- 検索クエリここまで ---")
print("\n[3] Brave Search APIでWeb検索中...")
async def run_single_search(query: str) -> dict:
print(f"- 検索中: {query}")
result = await asyncio.to_thread(
web_search,
query=query,
max_results=MAX_SEARCH_RESULTS_PER_QUERY,
)
event_bus.emit(
events.WEB_SEARCH_COMPLETED,
status=result.get("status", "UNKNOWN").upper(),
payload={
"query": query,
"provider": result.get("provider", ""),
"result_count": len(result.get("results", [])),
"message": result.get("message", ""),
},
)
return result
all_search_results = await asyncio.gather(
*(run_single_search(query) for query in state.research_queries)
)
search_results_json = web_search_results_to_json(all_search_results)
state.search_results_text = web_search_results_to_text(all_search_results)
web_search_results_path = f"{state.run_dir}/web_search_results.json"
await asyncio.to_thread(
save_text_file,
web_search_results_path,
search_results_json,
)
event_bus.emit(
events.WEB_SEARCH_RESULTS_SAVED,
status="OK",
payload={
"path": web_search_results_path,
},
)
print("\n[4] Researcher AI が調査メモを作成中...")
state.research_note = await asyncio.to_thread(
run_researcher,
state.keyword,
state.plan,
state.search_results_text,
)
research_note_path = f"{state.run_dir}/research_note.txt"
await asyncio.to_thread(
save_text_file,
research_note_path,
state.research_note,
)
print("\n--- Researcher AI の調査メモ ---")
print(state.research_note)
print("--- 調査メモここまで ---")
await emit_and_queue(
event_bus=event_bus,
event_queue=event_queue,
event=events.RESEARCH_COMPLETED,
status="OK",
payload={
"path": research_note_path,
},
)
async def handle_writer(
state,
event_bus,
event_queue: asyncio.Queue,
revision_source: str = "",
) -> None:
"""
Writer AIで初稿または修正版を作成する。
"""
if state.loop_count == 0 and not state.article:
print("\n[5] Writer AI が初稿を作成中...")
state.article = await asyncio.to_thread(
run_writer,
keyword=state.keyword,
plan=state.plan,
research_note=state.research_note,
)
state.article = sanitize_html_fragment(state.article)
draft_path = f"{state.run_dir}/draft_1.html"
await asyncio.to_thread(
save_text_file,
draft_path,
state.article,
)
await emit_and_queue(
event_bus=event_bus,
event_queue=event_queue,
event=events.DRAFT_CREATED,
status="OK",
payload={
"loop": 0,
"path": draft_path,
},
)
return
print(
f"\n[7] Writer AI が指摘に従って修正中... "
f"({state.loop_count}/{state.max_review_loops})"
)
if revision_source == "validator":
review_for_writer = build_validation_review(state.final_validation_issues)
elif revision_source == "reviewer":
review_for_writer = build_review_json_feedback(state.latest_review_result)
else:
review_for_writer = build_unknown_review_feedback(
state.latest_review,
state.latest_review_result,
)
event_bus.emit(
events.WRITER_REVISION_REQUESTED,
status="REVISION_REQUIRED",
payload={
"loop": state.loop_count,
"source": revision_source or "unknown",
},
)
state.article = await asyncio.to_thread(
run_writer,
keyword=state.keyword,
plan=state.plan,
research_note=state.research_note,
review=review_for_writer,
previous_draft=state.article,
)
state.article = sanitize_html_fragment(state.article)
next_draft_number = state.loop_count + 1
revised_draft_path = f"{state.run_dir}/draft_{next_draft_number}.html"
await asyncio.to_thread(
save_text_file,
revised_draft_path,
state.article,
)
await emit_and_queue(
event_bus=event_bus,
event_queue=event_queue,
event=events.DRAFT_REVISED,
status="OK",
payload={
"loop": state.loop_count,
"source": revision_source or "unknown",
"path": revised_draft_path,
},
)
async def handle_reviewer(state, event_bus, event_queue: asyncio.Queue) -> None:
"""
Reviewer AIを実行する。
"""
state.loop_count += 1
print(
f"\n[6] Reviewer AI がレビュー中... "
f"({state.loop_count}/{state.max_review_loops})"
)
state.latest_review = await asyncio.to_thread(
run_reviewer,
keyword=state.keyword,
draft=state.article,
)
review_path = f"{state.run_dir}/review_{state.loop_count}.txt"
await asyncio.to_thread(
save_text_file,
review_path,
state.latest_review,
)
print("\n--- Reviewer AI のレビュー結果 ---")
print(state.latest_review)
print("--- レビュー結果ここまで ---")
state.latest_review_result = parse_review_result(state.latest_review)
state.final_review_status = state.latest_review_result["status"]
review_result_path = f"{state.run_dir}/review_result_{state.loop_count}.json"
await asyncio.to_thread(
save_text_file,
review_result_path,
review_result_to_text(state.latest_review_result),
)
print("\n--- Reviewer AI の判定JSON ---")
print(review_result_to_text(state.latest_review_result))
print("--- 判定JSONここまで ---")
await emit_and_queue(
event_bus=event_bus,
event_queue=event_queue,
event=events.REVIEW_COMPLETED,
status=state.final_review_status,
payload={
"loop": state.loop_count,
"review_path": review_path,
"review_result_path": review_result_path,
"summary": state.latest_review_result.get("summary", ""),
"issues": state.latest_review_result.get("issues", []),
"next_action": state.latest_review_result.get("next_action", ""),
},
)
async def handle_validator(state, event_bus, event_queue: asyncio.Queue) -> None:
"""
Python側HTMLチェックを実行する。
"""
state.article = sanitize_html_fragment(state.article)
validation_issues = validate_html_fragment(state.article)
state.final_validation_issues = validation_issues
validation_text = build_validation_text(validation_issues)
validation_path = f"{state.run_dir}/validation_{state.loop_count}.txt"
await asyncio.to_thread(
save_text_file,
validation_path,
validation_text,
)
validation_status = "FAILED" if validation_issues else "OK"
if validation_issues:
print("\nPython側HTMLチェック:修正必要")
for issue in validation_issues:
print(f"- {issue}")
else:
print("\nPython側HTMLチェック:OK")
await emit_and_queue(
event_bus=event_bus,
event_queue=event_queue,
event=events.VALIDATION_COMPLETED,
status=validation_status,
payload={
"loop": state.loop_count,
"path": validation_path,
"issues": validation_issues,
},
)
async def handle_save_article(state, event_bus, event_queue: asyncio.Queue) -> None:
"""
最終記事を保存する。
第9回では、Reviewer/Validatorを通過した記事と、
最大レビュー回数到達で保存された未承認記事を分ける。
"""
print("\n[8] 記事HTMLを保存中...")
state.article = sanitize_html_fragment(state.article)
if state.review_passed:
result = await asyncio.to_thread(
save_article,
state.run_dir,
state.article,
)
state.final_article_path = result["path"]
saved_type = "approved_final_article"
else:
needs_review_path = f"{state.run_dir}/draft_needs_review.html"
await asyncio.to_thread(
save_text_file,
needs_review_path,
state.article,
)
state.final_article_path = needs_review_path
saved_type = "needs_human_review"
await emit_and_queue(
event_bus=event_bus,
event_queue=event_queue,
event=events.ARTICLE_SAVED,
status="OK",
payload={
"path": state.final_article_path,
"saved_type": saved_type,
"review_passed": state.review_passed,
"final_review_status": state.final_review_status,
},
)
async def handle_run_summary(state, event_bus) -> None:
"""
実行概要を保存する。
"""
if state.review_passed:
status_text = "Reviewer AIのJSON判定とPython側HTMLチェックのOK後に保存"
else:
status_text = "最大レビュー回数到達のため、未承認記事として保存"
if state.final_validation_issues:
validation_summary = "\n".join(
f"- {issue}" for issue in state.final_validation_issues
)
else:
validation_summary = "Python側HTMLチェック:OK"
review_json_summary = review_result_to_text(state.latest_review_result)
summary = f"""実行キーワード:
{state.keyword}
保存状態:
{status_text}
Reviewer最終JSON判定:
{state.final_review_status}
最終保存先:
{state.final_article_path}
レビュー最大回数:
{state.max_review_loops}
実行結果フォルダ:
{state.run_dir}
検索クエリ:
{state.research_query_text}
最終Reviewer判定JSON:
{review_json_summary}
最終Python側HTMLチェック:
{validation_summary}
イベントログ:
{state.run_dir}/event_log.json
"""
await asyncio.to_thread(
save_run_summary,
state.run_dir,
summary,
)
event_bus.emit(
events.PROCESS_COMPLETED,
status="OK" if state.review_passed else "COMPLETED_WITH_MAX_LOOPS",
payload={
"keyword": state.keyword,
"run_dir": state.run_dir,
"final_article_path": state.final_article_path,
"review_passed": state.review_passed,
},
)追加:async_dispatcher.py
import asyncio
import event_types as events
from async_handlers import (
handle_planner,
handle_researcher,
handle_writer,
handle_reviewer,
handle_validator,
handle_save_article,
handle_run_summary,
emit_and_queue,
)
class AsyncDispatcher:
"""
第9回:非同期Dispatcher。
asyncio.Queue から取り出されたイベントを受け取り、
event / status に応じて次の async Handler を呼び出す。
"""
def __init__(self, state, event_bus, event_queue: asyncio.Queue):
self.state = state
self.event_bus = event_bus
self.event_queue = event_queue
self.finished = False
async def dispatch(self, event_record: dict) -> None:
"""
イベント1件を処理する。
"""
event = event_record.get("event")
status = event_record.get("status")
payload = event_record.get("payload", {})
if event == events.PROCESS_STARTED:
await handle_planner(
self.state,
self.event_bus,
self.event_queue,
)
return
if event == events.PLAN_CREATED:
await handle_researcher(
self.state,
self.event_bus,
self.event_queue,
)
return
if event == events.RESEARCH_COMPLETED:
await handle_writer(
self.state,
self.event_bus,
self.event_queue,
)
return
if event in (events.DRAFT_CREATED, events.DRAFT_REVISED):
await handle_reviewer(
self.state,
self.event_bus,
self.event_queue,
)
return
if event == events.REVIEW_COMPLETED:
await self._handle_review_completed(status)
return
if event == events.ARTICLE_REVISION_REQUIRED:
await self._handle_article_revision_required(payload)
return
if event == events.REVIEW_UNKNOWN:
await self._handle_review_unknown(payload)
return
if event == events.VALIDATION_COMPLETED:
await self._handle_validation_completed(status)
return
if event == events.VALIDATION_FAILED:
await self._handle_validation_failed(payload)
return
if event == events.ARTICLE_APPROVED:
await handle_save_article(
self.state,
self.event_bus,
self.event_queue,
)
return
if event == events.ARTICLE_SAVED:
await handle_run_summary(
self.state,
self.event_bus,
)
self.finished = True
return
async def _handle_review_completed(self, status: str) -> None:
"""
Reviewer AIの判定結果に応じて分岐する。
"""
if status == "OK":
await handle_validator(
self.state,
self.event_bus,
self.event_queue,
)
return
if status == "REVISION_REQUIRED":
await emit_and_queue(
event_bus=self.event_bus,
event_queue=self.event_queue,
event=events.ARTICLE_REVISION_REQUIRED,
status="REVISION_REQUIRED",
payload={
"loop": self.state.loop_count,
"source": "reviewer",
"issues": self.state.latest_review_result.get("issues", []),
},
)
return
await emit_and_queue(
event_bus=self.event_bus,
event_queue=self.event_queue,
event=events.REVIEW_UNKNOWN,
status="UNKNOWN",
payload={
"loop": self.state.loop_count,
"source": "reviewer",
"reason": "Reviewer AI の判定JSONが不明です。",
},
)
async def _handle_article_revision_required(self, payload: dict) -> None:
"""
Reviewer AI起点の修正要求を処理する。
"""
if not await self._can_continue_revision():
return
await handle_writer(
self.state,
self.event_bus,
self.event_queue,
revision_source=payload.get("source", "reviewer"),
)
async def _handle_review_unknown(self, payload: dict) -> None:
"""
Reviewer AIのJSON判定が不明な場合の処理。
"""
if not await self._can_continue_revision():
return
await handle_writer(
self.state,
self.event_bus,
self.event_queue,
revision_source="unknown_review",
)
async def _handle_validation_completed(self, status: str) -> None:
"""
Validatorの結果に応じて分岐する。
"""
if status == "OK":
self.state.review_passed = True
await emit_and_queue(
event_bus=self.event_bus,
event_queue=self.event_queue,
event=events.ARTICLE_APPROVED,
status="OK",
payload={
"loop": self.state.loop_count,
"review_status": self.state.final_review_status,
"validation_status": status,
},
)
return
await emit_and_queue(
event_bus=self.event_bus,
event_queue=self.event_queue,
event=events.VALIDATION_FAILED,
status="FAILED",
payload={
"loop": self.state.loop_count,
"source": "validator",
"issues": self.state.final_validation_issues,
},
)
async def _handle_validation_failed(self, payload: dict) -> None:
"""
Validator起点の修正要求を処理する。
"""
if not await self._can_continue_revision():
return
await handle_writer(
self.state,
self.event_bus,
self.event_queue,
revision_source=payload.get("source", "validator"),
)
async def _can_continue_revision(self) -> bool:
"""
最大レビュー回数に達していないか確認する。
"""
if self.state.loop_count >= self.state.max_review_loops:
print("\n最大レビュー回数に達しました。未承認記事として保存します。")
self.event_bus.emit(
events.REVIEW_MAX_LOOPS_REACHED,
status="MAX_LOOPS_REACHED",
payload={
"loop": self.state.loop_count,
"max_review_loops": self.state.max_review_loops,
"review_status": self.state.final_review_status,
"validation_issues": self.state.final_validation_issues,
},
)
await emit_and_queue(
event_bus=self.event_bus,
event_queue=self.event_queue,
event=events.ARTICLE_APPROVED,
status="MAX_LOOPS_REACHED",
payload={
"loop": self.state.loop_count,
"reason": "最大レビュー回数に達したため、未承認記事として保存します。",
},
)
return False
return True追加:runtime_state.py
from dataclasses import dataclass, field
@dataclass
class RuntimeState:
"""
第8回・第9回共通:イベント駆動型処理で共有する実行状態。
main.py / main_async.py / Dispatcher / Handler 間で、
keyword、plan、research_note、article、review結果などを
受け渡すために使う。
"""
# 基本情報
keyword: str
run_dir: str
# Planner AI の出力
plan: str = ""
# Researcher AI / Web検索 関連
research_queries: list[str] = field(default_factory=list)
research_query_text: str = ""
search_results_text: str = ""
research_note: str = ""
# Writer AI の出力
article: str = ""
# Reviewer AI の出力
latest_review: str = ""
latest_review_result: dict = field(default_factory=dict)
# ループ制御
loop_count: int = 0
max_review_loops: int = 3
# 最終判定
review_passed: bool = False
final_review_status: str = "UNKNOWN"
final_validation_issues: list[str] = field(default_factory=list)
# 保存結果
final_article_path: str = ""変更:event_types.py
"""
第9回:非同期イベント駆動型マルチエージェント用のイベント定義。
asyncio.Queue + Worker 構成でも、第8回と同じイベント名を使う。
Dispatcher は event / status を見て次の処理を判断する。
"""
# Process
PROCESS_STARTED = "process.started"
PROCESS_COMPLETED = "process.completed"
PROCESS_FAILED = "process.failed"
# Planner AI
RUN_PLANNER = "run.planner"
PLAN_CREATED = "plan.created"
# Researcher AI
RUN_RESEARCHER = "run.researcher"
RESEARCH_QUERY_CREATED = "research.query_created"
WEB_SEARCH_COMPLETED = "web_search.completed"
WEB_SEARCH_RESULTS_SAVED = "web_search_results.saved"
RESEARCH_COMPLETED = "research.completed"
# Writer AI
RUN_WRITER = "run.writer"
DRAFT_CREATED = "draft.created"
DRAFT_REVISED = "draft.revised"
WRITER_REVISION_REQUESTED = "writer.revision_requested"
# Reviewer AI
RUN_REVIEWER = "run.reviewer"
REVIEW_COMPLETED = "review.completed"
REVIEW_UNKNOWN = "review.unknown"
ARTICLE_REVISION_REQUIRED = "article.revision_required"
REVIEW_MAX_LOOPS_REACHED = "review.max_loops_reached"
# Validator
RUN_VALIDATOR = "run.validator"
VALIDATION_COMPLETED = "validation.completed"
VALIDATION_FAILED = "validation.failed"
# Approval / Save
ARTICLE_APPROVED = "article.approved"
SAVE_ARTICLE = "save.article"
ARTICLE_SAVED = "article.saved"追加:main_async.py
import asyncio
from datetime import datetime
import re
from event_bus import EventBus
from runtime_state import RuntimeState
from async_dispatcher import AsyncDispatcher
from async_worker import worker_loop
import event_types as events
MAX_REVIEW_LOOPS = 3
WORKER_COUNT = 3
def slugify_keyword(keyword: str, max_length: int = 40) -> str:
"""
入力キーワードをフォルダ名に使いやすい形へ整える。
日本語は残しつつ、ファイル名に使いにくい記号を除去する。
"""
slug = re.sub(r'[\\/:*?"<>|]+', "_", keyword)
slug = re.sub(r"\s+", "_", slug)
slug = slug.strip("_")
if not slug:
slug = "article"
return slug[:max_length]
def input_multiline(prompt: str = "記事テーマ・条件を入力してください:") -> str:
"""
複数行の入力を受け取る。
空行で入力終了。
"""
print(prompt)
print("入力が終わったら、空行のままEnterを押してください。")
print("--------------------------------------------------")
lines = []
while True:
line = input()
if line.strip() == "":
break
lines.append(line)
return "\n".join(lines).strip()
def save_event_log(run_dir: str, event_bus: EventBus) -> None:
"""
EventBusに記録されたイベント履歴を event_log.json として保存する。
"""
event_log_path = f"{run_dir}/event_log.json"
with open(event_log_path, "w", encoding="utf-8") as file:
file.write(event_bus.to_json())
async def run_async_event_system() -> None:
"""
第9回:asyncio.Queue + Worker 版のメイン処理。
"""
print("Gemma 4ローカル・マルチエージェント記事作成システム")
print("第9回:asyncio.Queue + Worker 版")
print("--------------------------------------------------")
keyword = input_multiline()
if not keyword:
print("記事テーマが空です。処理を終了します。")
return
now = datetime.now()
keyword_slug = slugify_keyword(keyword)
run_dir = f"output/{now:%Y%m%d_%H%M%S}_{keyword_slug}"
print(f"\n実行結果フォルダ: {run_dir}")
event_bus = EventBus()
event_queue = asyncio.Queue()
log_lock = asyncio.Lock()
state = RuntimeState(
keyword=keyword,
run_dir=run_dir,
max_review_loops=MAX_REVIEW_LOOPS,
)
dispatcher = AsyncDispatcher(
state=state,
event_bus=event_bus,
event_queue=event_queue,
)
workers = [
asyncio.create_task(
worker_loop(
name=f"worker-{index + 1}",
event_queue=event_queue,
dispatcher=dispatcher,
event_bus=event_bus,
run_dir=run_dir,
save_event_log=save_event_log,
log_lock=log_lock,
)
)
for index in range(WORKER_COUNT)
]
start_event = event_bus.emit(
events.PROCESS_STARTED,
status="OK",
payload={
"keyword": keyword,
"run_dir": run_dir,
"max_review_loops": MAX_REVIEW_LOOPS,
"mode": "asyncio_queue_worker",
"worker_count": WORKER_COUNT,
},
)
await event_queue.put(start_event)
await event_queue.join()
for _ in workers:
await event_queue.put(None)
await asyncio.gather(*workers)
save_event_log(run_dir, event_bus)
print("\n完了しました。")
print(f"保存先: {state.final_article_path}")
print(f"実行結果フォルダ: {run_dir}")
print(f"イベントログ: {run_dir}/event_log.json")
print(f"Reviewer最終JSON判定:{state.final_review_status}")
if state.review_passed:
print("保存状態:Reviewer AI のJSON判定とPython側HTMLチェックのOK後に final_article.html として保存しました。")
else:
print("保存状態:最大レビュー回数に達したため draft_needs_review.html として保存しました。")
def main() -> None:
try:
asyncio.run(run_async_event_system())
except KeyboardInterrupt:
print("\n処理を中断しました。")
if __name__ == "__main__":
main()既存ファイルの扱い
第9回では、以下は第8回版をそのまま使います。
event_bus.py
review_parser.py
validators.py
mcp_client.py
mcp_server.py
web_search_tool.py
agents/planner.py
agents/researcher.py
agents/writer.py
agents/reviewer.py第9回AI記事作成の実行方法
第9回AI記事作成での実行は、以下のコマンドで行います。
python main_async.pymain_async.py は複数行入力に対応しています。
キーワードを入力し、最後に空行のままEnterを押すと処理が始まります。
実行時に生成されるファイル
第9回AI記事作成の実行時に、以下のファイルが生成されます。
event_log.json
final_article.html
draft_needs_review.html
run_summary.txt
review_result_*.json
validation_*.txt記事作成実行後は、上記ファイルにより動作確認を行います。
また、第9回AI記事作成で生成される記事の保存先は、以下のように変更されます。
review_passed: true
→ final_article.html
review_passed: false
→ draft_needs_review.html第9回では、作成した記事の公開可否を判断しやすくするために、レビューNGの記事をdraft_needs_review.html として分けて保存しています。
第9回AI記事作成の実行結果
ターミナル出力
(UTF-8)
(SHIFT-JIS)
出力記事
(UTF-8)
(SHIFT-JIS)
event_log.json
(UTF-8)
(SHIFT-JIS)
event_log.jsonから分かる実行結果
event_log.jsonから、process.started で第9回AI記事作成の asyncio.Queue + Worker 版として起動し、worker_count: 3 により3つのWorkerを用意したことが分かります。
その後、Planner AI、Researcher AI、Writer AI、Reviewer AI、Validator、保存処理の順にイベントが記録されています。
また、5件の web_search.completed が同じ時刻に記録されているため、Web検索部分は並列的に実行されたと考えられます。
main_async.py
|
| process.started
| mode: asyncio_queue_worker / worker_count: 3
v
Planner AI
|
| plan.created
v
Researcher AI
|
| research.query_created
v
Web Search
|
| web_search.completed × 5
| ※5件が同時刻に完了
v
Researcher AI
|
| research.completed
v
Writer AI
|
| draft.created
v
Reviewer AI
|
| review.completed / status: OK
v
Validator
|
| validation.completed / status: OK
v
Save
|
| article.saved / review_passed: true
v
process.completed / status: OK
まとめ
第9回AI記事作成では、第8回のイベントキュー型構成を発展させ、asyncio.Queue と Worker を使った非同期イベント駆動型のAI記事作成システムを作成しました。
main_async.py で asyncio.Queue を用意し、複数のWorkerがイベントを待機します。
イベントが投入されると、WorkerがQueueから取り出し、AsyncDispatcherが event と status を見て、Planner、Researcher、Writer、Reviewer、Validator、保存処理へ振り分けます。
また、既存の run_writer() や run_reviewer() などの同期関数は、asyncio.to_thread() を使って別スレッドで実行する形にしました。これにより、既存コードを大きく変更せずに、非同期イベント駆動型の構成へ組み込めます。
実行結果では、event_log.json に mode: asyncio_queue_worker と worker_count: 3 が記録され、第9回版として動作していることを確認できました。Reviewer AIとValidatorの判定もOKとなり、最終記事は final_article.html として保存されました。
これにより、AI記事作成システムは、単なる順番実行から、非同期イベント駆動型の基盤へと一歩進みました。
次回以降に向けて
次回以降では、Ollama API呼び出しやWeb検索処理を async def に対応させ、httpx.AsyncClient などを使って関数そのものを非同期化していきます。
これにより、to_thread() に依存せず、より自然な非同期イベント駆動型の構成へ発展できます。

