第8回では、第7回で追加した event_log.json によるイベント記録を発展させ、Event Queue、Dispatcher、Handler を導入します。
これにより、main.py がすべての処理を直接制御する構成から、発生したイベントをキューに積み、Dispatcherが event と status を見て次の処理を判断する構成へ変更します。
ただし、第8回ではまだ asyncio や複数Workerは使わず、同期処理でイベントキューを順番に処理する「疑似非同期」の構成として実装します。第9回では、この構成を asyncio.Queue に発展させ、本格的な非同期イベント駆動型へ進める予定です。
第7回AI記事作成から何が変わるのか
第7回AI記事作成では、ローカルAIを使ったマルチエージェント記事作成システムに、event_bus.py(発生イベントを記録) と event_types.py(イベント名一覧) を追加しました。
これにより、Planner AI、Researcher AI、Writer AI、Reviewer AI、Validator の処理結果を、次のようなイベントとして記録できるようになりました。
process.started
plan.created
research.completed
draft.created
review.completed
validation.completed
article.approved
article.savedそして、これらのイベントは event_log.json に保存されます。
ただし、第7回AI記事作成の時点では、まだ main.py が多くの処理判断を担当していました。
たとえば、Reviewer AIの判定が OK ならValidatorへ進み、REVISION_REQUIRED ならWriter AIへ戻す、という分岐は main.py 側に残っていました。
第8回AI記事作成では、この構成をさらに一歩進めます。
今回の目的は、main.py 中心の順番実行から、Event Queue と Dispatcher を使ったイベントキュー型マルチエージェント構成へ整理することです。
第8回AI記事作成の構成
第8回AI記事作成で目指す構成は、次の通りです。
main.py
↓
process.started を発行
↓
Event Queue に積む
↓
Dispatcher がイベントを取り出す
↓
event / status に応じて Handler を呼ぶ
↓
Handler がAI処理や保存処理を実行
↓
次のイベントを Event Queue に積むつまり、main.py がすべての処理を直接呼び出すのではなく、イベントを起点に処理を進める形へ変更します。
第8回AI記事作成で追加する主な要素は以下です。
event_queue.py
runtime_state.py
dispatcher.py
handlers.pyそれぞれの役割は次の通りです。
| ファイル | 役割 |
|---|---|
event_queue.py | 発生したイベントを順番に保持する |
runtime_state.py | keyword、plan、articleなどの実行状態を保持する |
dispatcher.py | event / status を見て次の処理を判断する |
handlers.py | Planner、Researcher、Writer、Reviewer、Validator、保存処理を実行する |
main.py | 初期化とDispatcherループの起動に役割を絞る |
第8回は「正式な非同期処理」ではない
ここで重要なのは、第8回ではまだ asyncio は使わないという点です。
第8回のEvent Queueは、次のような同期処理で動きます。
while event_queue.has_events():
event_record = event_queue.pop()
dispatcher.dispatch(event_record)これは、イベントをキューに積んで順番に処理する構成です。
そのため、構造としてはイベント駆動型に近づいていますが、実行方式としてはまだ完全な非同期処理ではありません。
Event Queueが必要な理由
第7回AI記事作成の構成でも、イベントログは保存できました。
しかし、イベントログはあくまで「何が起きたかの記録」です。
一方、Event Queueは「次に何を処理するか」を管理します。
違いを整理すると、以下のようになります。
| 仕組み | 役割 |
|---|---|
| EventBus | イベントを記録する |
| event_log.json | 発生したイベントの履歴を保存する |
| EventQueue | 次に処理すべきイベントを順番に保持する |
| Dispatcher | キューから取り出したイベントを見て処理を振り分ける |
第8回では、EventBusとEventQueueを分けて考えます。
EventBus
→ イベント履歴を残す
EventQueue
→ 次に処理するイベントを保持するこの分離により、処理の流れがかなり見えやすくなります。
Dispatcherとは何か
Dispatcherは、イベント駆動型システムの「交通整理役」です。
たとえば、次のようなイベントが来たとします。
review.completed + status OKこの場合、Dispatcherは次の処理へ進めます。
Validatorを実行する一方で、Reviewer AIが修正必要と判断した場合は、次のようになります。
review.completed + status REVISION_REQUIREDこの場合、Dispatcherは次のように判断します。
article.revision_required を記録
Writer AIへ戻すつまりDispatcherは、イベントとステータスを見て、次に実行するHandlerを選びます。
event + status
↓
Dispatcher
↓
次のHandler第8回AI記事作成では、この判断を main.py から dispatcher.py へ移動しました。
Handlerとは何か
Handlerは、実際の処理を実行する関数です。
Dispatcherは「何を実行するか」を判断しますが、AIやツールを実際に呼び出すのはHandlerです。
たとえば、以下のようなHandlerを用意します。
handle_planner()
handle_researcher()
handle_writer()
handle_reviewer()
handle_validator()
handle_save_article()
handle_run_summary()それぞれの役割は次の通りです。
| Handler | 実行内容 |
|---|---|
handle_planner() | Planner AIで記事設計を作る |
handle_researcher() | 検索クエリ作成、Web検索、調査メモ作成を行う |
handle_writer() | 初稿または修正版の記事を作る |
handle_reviewer() | Reviewer AIで記事を確認する |
handle_validator() | Python側でHTML構造をチェックする |
handle_save_article() | 最終記事を保存する |
handle_run_summary() | 実行概要を保存する |
このように、DispatcherとHandlerを分けることで、コードの役割が整理されます。
第8回AI記事作成の起動から完了までの流れ

第8回AI記事作成の処理は、次のように進みます。
python main.py
↓
記事キーワードを入力
↓
RuntimeState / EventBus / EventQueue / Dispatcher を初期化
↓
process.started を発行
↓
EventQueue に process.started を追加
↓
Dispatcher が process.started を処理
↓
Planner AI が記事設計を作成
↓
plan.created を発行
↓
Researcher AI が検索・調査メモ作成
↓
research.completed を発行
↓
Writer AI が初稿作成
↓
draft.created を発行
↓
Reviewer AI がレビュー
↓
review.completed を発行
↓
Validator がHTMLチェック
↓
article.approved
↓
final_article.html を保存
↓
article.saved
↓
run_summary.txt を保存
↓
process.completed順調に進んだ場合、イベントログは次のようになります。
process.started
plan.created
research.query_created
web_search.completed
web_search.completed
web_search.completed
web_search_results.saved
research.completed
draft.created
review.completed
validation.completed
article.approved
article.saved
process.completedこのように、処理の流れがイベント名として残るため、どこで何が起きたかをあとから追いやすくなります。
修正ループもイベントで制御する
第8回で重要なのは、修正ループもイベントで制御する点です。
Reviewer AIが修正必要と判断した場合、次の流れになります。
review.completed + status REVISION_REQUIRED
↓
article.revision_required
↓
writer.revision_requested
↓
draft.revised
↓
review.completedまた、Reviewer AIの判定がOKでも、Python側のValidatorでHTML形式の問題が見つかる場合があります。
その場合は、次の流れになります。
validation.completed + status FAILED
↓
validation.failed
↓
writer.revision_requested
↓
draft.revised
↓
review.completedつまり、第8回では修正ループが2種類あります。
Reviewer AI起点の修正ループ
Validator起点の修正ループReviewer AIは記事の内容や構成を見ます。
ValidatorはHTML形式やMarkdown混入、styleタグ、scriptタグなどを確認します。
この2つを分けることで、記事品質と出力形式の両方をチェックできます。
イベントの流れを図で表すと以下の様になります。
main.py
│
├─ process.started を発行
▼
EventQueue
│
▼
Dispatcher
│
├─ process.started → handle_planner()
├─ plan.created → handle_researcher()
├─ research.completed → handle_writer()
├─ draft.created → handle_reviewer()
├─ review.completed + OK → handle_validator()
├─ review.completed + REVISION_REQUIRED → handle_writer()
├─ validation.completed + OK → article.approved
├─ validation.completed + FAILED → handle_writer()
└─ article.approved → handle_save_article()
第8回AI記事作成で追加・変更となるファイル
第8回AI記事作成では、以下のファイルが追加・変更となります。
追加:
・event_queue.py
・runtime_state.py
・dispatcher.py
・handlers.py
変更:
・event_types.py
・main.py
基本的にそのまま:
・event_bus.py
・agents/planner.py
・agents/researcher.py
・agents/writer.py
・agents/reviewer.py
・validators.py
・review_parser.py
・mcp_client.py
・mcp_server.py
・web_search_tool.py
第8回のファイル構成は下記の通りとなります。
article-agent/
├ main.py ← 変更
├ llm_client.py
├ validators.py
├ review_parser.py
├ event_types.py ← 変更
├ event_bus.py ← 変更なしでも可
├ event_queue.py ← 追加
├ runtime_state.py ← 追加
├ dispatcher.py ← 追加
├ handlers.py ← 追加
├ agents/
│ ├ planner.py
│ ├ researcher.py
│ ├ writer.py
│ └ reviewer.py
├ mcp_client.py
├ mcp_server.py
├ web_search_tool.py
└ output/変更:event_types.py
"""
第8回:イベント駆動型マルチエージェント用のイベント定義。
第7回ではイベントログ記録が中心だったが、
第8回では Dispatcher が event / status を見て処理を振り分ける。
"""
# Process
PROCESS_STARTED = "process.started"
PROCESS_COMPLETED = "process.completed"
PROCESS_FAILED = "process.failed"
# Planner AI
RUN_PLANNER = "run.planner"
PLAN_CREATED = "plan.created"
# Researcher AI
RUN_RESEARCHER = "run.researcher"
RESEARCH_QUERY_CREATED = "research.query_created"
WEB_SEARCH_COMPLETED = "web_search.completed"
WEB_SEARCH_RESULTS_SAVED = "web_search_results.saved"
RESEARCH_COMPLETED = "research.completed"
# Writer AI
RUN_WRITER = "run.writer"
DRAFT_CREATED = "draft.created"
DRAFT_REVISED = "draft.revised"
WRITER_REVISION_REQUESTED = "writer.revision_requested"
# Reviewer AI
RUN_REVIEWER = "run.reviewer"
REVIEW_COMPLETED = "review.completed"
REVIEW_UNKNOWN = "review.unknown"
ARTICLE_REVISION_REQUIRED = "article.revision_required"
REVIEW_MAX_LOOPS_REACHED = "review.max_loops_reached"
# Validator
RUN_VALIDATOR = "run.validator"
VALIDATION_COMPLETED = "validation.completed"
VALIDATION_FAILED = "validation.failed"
# Approval / Save
ARTICLE_APPROVED = "article.approved"
SAVE_ARTICLE = "save.article"
ARTICLE_SAVED = "article.saved"追加:event_queue.py
from collections import deque
class EventQueue:
"""
第8回:疑似非同期用のイベントキュー。
asyncio.Queue はまだ使わず、
発生したイベントを順番に積み、Dispatcher が1件ずつ取り出して処理する。
"""
def __init__(self):
self._queue = deque()
def push(self, event_record: dict) -> None:
"""
イベントをキューに追加する。
"""
self._queue.append(event_record)
def pop(self) -> dict | None:
"""
イベントを1件取り出す。
キューが空なら None を返す。
"""
if not self._queue:
return None
return self._queue.popleft()
def has_events(self) -> bool:
"""
キューにイベントが残っているか確認する。
"""
return len(self._queue) > 0
def size(self) -> int:
"""
現在のキュー件数を返す。
"""
return len(self._queue)
def clear(self) -> None:
"""
キューを空にする。
"""
self._queue.clear()追加:handlers.py
from agents.planner import run_planner
from agents.researcher import create_research_queries, run_researcher
from agents.writer import run_writer
from agents.reviewer import run_reviewer
from mcp_client import save_text_file, save_article, save_run_summary, web_search
from validators import validate_html_fragment, sanitize_html_fragment
from web_search_tool import web_search_results_to_text, web_search_results_to_json
from review_parser import (
parse_review_result,
review_result_to_text,
build_review_json_feedback,
build_unknown_review_feedback,
)
import event_types as events
MAX_SEARCH_RESULTS_PER_QUERY = 3
def build_validation_text(issues: list[str]) -> str:
"""
Python側HTMLチェック結果を保存用テキストに変換する。
"""
if not issues:
return "Python側HTMLチェック:OK\n"
lines = [
"Python側HTMLチェック:修正必要",
"",
]
for issue in issues:
lines.append(f"- {issue}")
return "\n".join(lines)
def build_validation_review(issues: list[str]) -> str:
"""
Python側チェックで見つかった問題を、
Writer AIに渡せるレビュー文に変換する。
"""
issue_text = "\n".join(f"- {issue}" for issue in issues)
return f"""
Python側のHTML形式チェックで、以下の問題が見つかりました。
{issue_text}
Writer AIへの修正指示:
・上記の問題をすべて修正してください。
・記事本文の内容は削除せず、維持してください。
・プレースホルダーに置き換えないでください。
・HTML断片として出力してください。
・styleタグ、style属性、CSS、JavaScriptは使わないでください。
・Markdown記法は使わず、HTMLタグに変換してください。
・必ず修正後の記事全文を出力してください。
status:
VALIDATION_FAILED
"""
def handle_planner(state, event_bus, event_queue):
"""
Planner AIを実行し、記事設計を作成する。
"""
print("\n[1] Planner AI が記事設計を作成中...")
state.plan = run_planner(state.keyword)
plan_path = f"{state.run_dir}/plan.txt"
save_text_file(plan_path, state.plan)
event_record = event_bus.emit(
events.PLAN_CREATED,
status="OK",
payload={
"path": plan_path,
},
)
event_queue.push(event_record)
def handle_researcher(state, event_bus, event_queue):
"""
Researcher AIで検索クエリ作成、Web検索、調査メモ作成まで行う。
"""
print("\n[2] Researcher AI が検索クエリを作成中...")
state.research_queries = create_research_queries(
keyword=state.keyword,
plan=state.plan,
)
state.research_query_text = "\n".join(f"- {query}" for query in state.research_queries)
research_query_path = f"{state.run_dir}/research_query.txt"
save_text_file(research_query_path, state.research_query_text)
event_bus.emit(
events.RESEARCH_QUERY_CREATED,
status="OK",
payload={
"path": research_query_path,
"queries": state.research_queries,
},
)
print("\n--- Researcher AI の検索クエリ ---")
print(state.research_query_text)
print("--- 検索クエリここまで ---")
print("\n[3] Brave Search APIでWeb検索中...")
all_search_results = []
for query in state.research_queries:
print(f"- 検索中: {query}")
result = web_search(
query=query,
max_results=MAX_SEARCH_RESULTS_PER_QUERY,
)
all_search_results.append(result)
event_bus.emit(
events.WEB_SEARCH_COMPLETED,
status=result.get("status", "UNKNOWN").upper(),
payload={
"query": query,
"provider": result.get("provider", ""),
"result_count": len(result.get("results", [])),
"message": result.get("message", ""),
},
)
search_results_json = web_search_results_to_json(all_search_results)
state.search_results_text = web_search_results_to_text(all_search_results)
web_search_results_path = f"{state.run_dir}/web_search_results.json"
save_text_file(web_search_results_path, search_results_json)
event_bus.emit(
events.WEB_SEARCH_RESULTS_SAVED,
status="OK",
payload={
"path": web_search_results_path,
},
)
print("\n[4] Researcher AI が調査メモを作成中...")
state.research_note = run_researcher(
keyword=state.keyword,
plan=state.plan,
search_results_text=state.search_results_text,
)
research_note_path = f"{state.run_dir}/research_note.txt"
save_text_file(research_note_path, state.research_note)
print("\n--- Researcher AI の調査メモ ---")
print(state.research_note)
print("--- 調査メモここまで ---")
event_record = event_bus.emit(
events.RESEARCH_COMPLETED,
status="OK",
payload={
"path": research_note_path,
},
)
event_queue.push(event_record)
def handle_writer(state, event_bus, event_queue, revision_source: str = ""):
"""
Writer AIで初稿または修正版を作成する。
"""
if state.loop_count == 0 and not state.article:
print("\n[5] Writer AI が初稿を作成中...")
state.article = run_writer(
keyword=state.keyword,
plan=state.plan,
research_note=state.research_note,
)
state.article = sanitize_html_fragment(state.article)
draft_path = f"{state.run_dir}/draft_1.html"
save_text_file(draft_path, state.article)
event_record = event_bus.emit(
events.DRAFT_CREATED,
status="OK",
payload={
"loop": 0,
"path": draft_path,
},
)
event_queue.push(event_record)
return
print(f"\n[7] Writer AI が指摘に従って修正中... ({state.loop_count}/{state.max_review_loops})")
if revision_source == "validator":
review_for_writer = build_validation_review(state.final_validation_issues)
elif state.final_review_status == "REVISION_REQUIRED":
review_for_writer = build_review_json_feedback(state.latest_review_result)
else:
review_for_writer = build_unknown_review_feedback(
state.latest_review,
state.latest_review_result,
)
event_bus.emit(
events.WRITER_REVISION_REQUESTED,
status="REVISION_REQUIRED",
payload={
"loop": state.loop_count,
"source": revision_source or "unknown",
},
)
state.article = run_writer(
keyword=state.keyword,
plan=state.plan,
research_note=state.research_note,
review=review_for_writer,
previous_draft=state.article,
)
state.article = sanitize_html_fragment(state.article)
next_draft_number = state.loop_count + 1
revised_draft_path = f"{state.run_dir}/draft_{next_draft_number}.html"
save_text_file(revised_draft_path, state.article)
event_record = event_bus.emit(
events.DRAFT_REVISED,
status="OK",
payload={
"loop": state.loop_count,
"source": revision_source or "unknown",
"path": revised_draft_path,
},
)
event_queue.push(event_record)
def handle_reviewer(state, event_bus, event_queue):
"""
Reviewer AIを実行する。
"""
state.loop_count += 1
print(f"\n[6] Reviewer AI がレビュー中... ({state.loop_count}/{state.max_review_loops})")
state.latest_review = run_reviewer(
keyword=state.keyword,
draft=state.article,
)
review_path = f"{state.run_dir}/review_{state.loop_count}.txt"
save_text_file(review_path, state.latest_review)
print("\n--- Reviewer AI のレビュー結果 ---")
print(state.latest_review)
print("--- レビュー結果ここまで ---")
state.latest_review_result = parse_review_result(state.latest_review)
state.final_review_status = state.latest_review_result["status"]
review_result_path = f"{state.run_dir}/review_result_{state.loop_count}.json"
save_text_file(
review_result_path,
review_result_to_text(state.latest_review_result),
)
print("\n--- Reviewer AI の判定JSON ---")
print(review_result_to_text(state.latest_review_result))
print("--- 判定JSONここまで ---")
event_record = event_bus.emit(
events.REVIEW_COMPLETED,
status=state.final_review_status,
payload={
"loop": state.loop_count,
"review_path": review_path,
"review_result_path": review_result_path,
"summary": state.latest_review_result.get("summary", ""),
"issues": state.latest_review_result.get("issues", []),
"next_action": state.latest_review_result.get("next_action", ""),
},
)
event_queue.push(event_record)
def handle_validator(state, event_bus, event_queue):
"""
Python側HTMLチェックを実行する。
"""
state.article = sanitize_html_fragment(state.article)
validation_issues = validate_html_fragment(state.article)
state.final_validation_issues = validation_issues
validation_text = build_validation_text(validation_issues)
validation_path = f"{state.run_dir}/validation_{state.loop_count}.txt"
save_text_file(validation_path, validation_text)
validation_status = "FAILED" if validation_issues else "OK"
if validation_issues:
print("\nPython側HTMLチェック:修正必要")
for issue in validation_issues:
print(f"- {issue}")
event_record = event_bus.emit(
events.VALIDATION_COMPLETED,
status=validation_status,
payload={
"loop": state.loop_count,
"path": validation_path,
"issues": validation_issues,
},
)
event_queue.push(event_record)
def handle_save_article(state, event_bus, event_queue):
"""
最終記事を保存する。
"""
print("\n[8] 最終HTMLを保存中...")
state.article = sanitize_html_fragment(state.article)
result = save_article(state.run_dir, state.article)
state.final_article_path = result["path"]
event_record = event_bus.emit(
events.ARTICLE_SAVED,
status="OK",
payload={
"path": result["path"],
"review_passed": state.review_passed,
"final_review_status": state.final_review_status,
},
)
event_queue.push(event_record)
def handle_run_summary(state, event_bus):
"""
実行概要を保存する。
"""
status_text = "OK判定後に保存" if state.review_passed else "最大レビュー回数到達後に保存"
if state.final_validation_issues:
validation_summary = "\n".join(f"- {issue}" for issue in state.final_validation_issues)
else:
validation_summary = "Python側HTMLチェック:OK"
review_json_summary = review_result_to_text(state.latest_review_result)
summary = f"""実行キーワード:{state.keyword}
保存状態:{status_text}
Reviewer最終JSON判定:{state.final_review_status}
最終保存先:{state.final_article_path}
レビュー最大回数:{state.max_review_loops}
実行結果フォルダ:{state.run_dir}
検索クエリ:
{state.research_query_text}
最終Reviewer判定JSON:
{review_json_summary}
最終Python側HTMLチェック:
{validation_summary}
イベントログ:
{state.run_dir}/event_log.json
"""
save_run_summary(state.run_dir, summary)
event_bus.emit(
events.PROCESS_COMPLETED,
status="OK" if state.review_passed else "COMPLETED_WITH_MAX_LOOPS",
payload={
"keyword": state.keyword,
"run_dir": state.run_dir,
"final_article_path": state.final_article_path,
"review_passed": state.review_passed,
},
)追加:dispatcher.py
import event_types as events
from handlers import (
handle_planner,
handle_researcher,
handle_writer,
handle_reviewer,
handle_validator,
handle_save_article,
handle_run_summary,
)
class Dispatcher:
"""
第8回:イベントを振り分けるDispatcher。
Event Queueから取り出したイベントを見て、
次にどのHandlerを呼ぶかを決める。
"""
def __init__(self, state, event_bus, event_queue):
self.state = state
self.event_bus = event_bus
self.event_queue = event_queue
self.finished = False
def dispatch(self, event_record: dict) -> None:
"""
イベント1件を処理する。
"""
event = event_record.get("event")
status = event_record.get("status")
if event == events.PROCESS_STARTED:
handle_planner(self.state, self.event_bus, self.event_queue)
return
if event == events.PLAN_CREATED:
handle_researcher(self.state, self.event_bus, self.event_queue)
return
if event == events.RESEARCH_COMPLETED:
handle_writer(self.state, self.event_bus, self.event_queue)
return
if event in (events.DRAFT_CREATED, events.DRAFT_REVISED):
handle_reviewer(self.state, self.event_bus, self.event_queue)
return
if event == events.REVIEW_COMPLETED:
self._handle_review_completed(status)
return
if event == events.VALIDATION_COMPLETED:
self._handle_validation_completed(status)
return
if event == events.ARTICLE_APPROVED:
handle_save_article(self.state, self.event_bus, self.event_queue)
return
if event == events.ARTICLE_SAVED:
handle_run_summary(self.state, self.event_bus)
self.finished = True
return
def _handle_review_completed(self, status: str) -> None:
"""
Reviewer AIの判定結果に応じて分岐する。
"""
if status == "OK":
handle_validator(self.state, self.event_bus, self.event_queue)
return
if status == "REVISION_REQUIRED":
self.event_bus.emit(
events.ARTICLE_REVISION_REQUIRED,
status="REVISION_REQUIRED",
payload={
"loop": self.state.loop_count,
"source": "reviewer",
"issues": self.state.latest_review_result.get("issues", []),
},
)
if self._can_continue_revision():
handle_writer(
self.state,
self.event_bus,
self.event_queue,
revision_source="reviewer",
)
return
self.event_bus.emit(
events.REVIEW_UNKNOWN,
status="UNKNOWN",
payload={
"loop": self.state.loop_count,
"reason": "Reviewer AI の判定JSONが不明です。",
},
)
if self._can_continue_revision():
handle_writer(
self.state,
self.event_bus,
self.event_queue,
revision_source="unknown_review",
)
def _handle_validation_completed(self, status: str) -> None:
"""
Validatorの結果に応じて分岐する。
"""
if status == "OK":
self.state.review_passed = True
event_record = self.event_bus.emit(
events.ARTICLE_APPROVED,
status="OK",
payload={
"loop": self.state.loop_count,
"review_status": self.state.final_review_status,
"validation_status": status,
},
)
self.event_queue.push(event_record)
return
self.event_bus.emit(
events.VALIDATION_FAILED,
status="FAILED",
payload={
"loop": self.state.loop_count,
"source": "validator",
"issues": self.state.final_validation_issues,
},
)
if self._can_continue_revision():
handle_writer(
self.state,
self.event_bus,
self.event_queue,
revision_source="validator",
)
def _can_continue_revision(self) -> bool:
"""
最大レビュー回数に達していないか確認する。
"""
if self.state.loop_count >= self.state.max_review_loops:
print("\n最大レビュー回数に達しました。これ以上修正せず、最後の記事を保存します。")
self.event_bus.emit(
events.REVIEW_MAX_LOOPS_REACHED,
status="MAX_LOOPS_REACHED",
payload={
"loop": self.state.loop_count,
"max_review_loops": self.state.max_review_loops,
"review_status": self.state.final_review_status,
"validation_issues": self.state.final_validation_issues,
},
)
event_record = self.event_bus.emit(
events.ARTICLE_APPROVED,
status="MAX_LOOPS_REACHED",
payload={
"loop": self.state.loop_count,
"reason": "最大レビュー回数に達したため、最後の記事を保存します。",
},
)
self.event_queue.push(event_record)
return False
return True追加: runtime_state.py
from dataclasses import dataclass, field
@dataclass
class RuntimeState:
"""
第8回:イベント駆動型処理で共有する実行状態。
main.py / Dispatcher / Handler 間で、
keyword、plan、research_note、article、review結果などを
受け渡すために使う。
第8回では、Event Queue と Dispatcher によって処理を分離するため、
各処理で必要になる値を RuntimeState に集約する。
"""
# 基本情報
keyword: str
run_dir: str
# Planner AI の出力
plan: str = ""
# Researcher AI / Web検索 関連
research_queries: list[str] = field(default_factory=list)
research_query_text: str = ""
search_results_text: str = ""
research_note: str = ""
# Writer AI の出力
article: str = ""
# Reviewer AI の出力
latest_review: str = ""
latest_review_result: dict = field(default_factory=dict)
# ループ制御
loop_count: int = 0
max_review_loops: int = 3
# 最終判定
review_passed: bool = False
final_review_status: str = "UNKNOWN"
final_validation_issues: list[str] = field(default_factory=list)
# 保存結果
final_article_path: str = ""変更:main.py
from datetime import datetime
import re
from event_bus import EventBus
from event_queue import EventQueue
from runtime_state import RuntimeState
from dispatcher import Dispatcher
import event_types as events
MAX_REVIEW_LOOPS = 3
def slugify_keyword(keyword: str, max_length: int = 40) -> str:
"""
入力キーワードをフォルダ名に使いやすい形へ整える。
日本語は残しつつ、ファイル名に使いにくい記号を除去する。
"""
slug = re.sub(r'[\\/:*?"<>|]+', "_", keyword)
slug = re.sub(r"\s+", "_", slug)
slug = slug.strip("_")
if not slug:
slug = "article"
return slug[:max_length]
def save_event_log(run_dir: str, event_bus: EventBus) -> None:
"""
EventBusに記録されたイベント履歴を event_log.json として保存する。
"""
event_log_path = f"{run_dir}/event_log.json"
with open(event_log_path, "w", encoding="utf-8") as file:
file.write(event_bus.to_json())
def run_dispatcher_loop(
dispatcher: Dispatcher,
event_queue: EventQueue,
event_bus: EventBus,
run_dir: str,
) -> None:
"""
第8回:疑似非同期のイベントキュー処理。
asyncio.Queue はまだ使わず、
EventQueue に積まれたイベントを1件ずつ取り出し、
Dispatcher に渡して処理する。
Dispatcher / Handler の処理中に新しいイベントが発行された場合、
そのイベントが再び EventQueue に積まれ、次のループで処理される。
"""
while event_queue.has_events() and not dispatcher.finished:
event_record = event_queue.pop()
if event_record is None:
break
dispatcher.dispatch(event_record)
# 途中経過も確認できるように、イベント処理ごとに event_log.json を更新する
save_event_log(run_dir, event_bus)
def main():
print("Gemma 4ローカル・マルチエージェント記事作成システム")
print("第8回:Dispatcher + Event Queue 版")
print("--------------------------------------------------")
keyword = input("記事キーワードを入力してください: ").strip()
if not keyword:
print("キーワードが空です。処理を終了します。")
return
now = datetime.now()
keyword_slug = slugify_keyword(keyword)
run_dir = f"output/{now:%Y%m%d_%H%M%S}_{keyword_slug}"
print(f"\n実行結果フォルダ: {run_dir}")
event_bus = EventBus()
event_queue = EventQueue()
state = RuntimeState(
keyword=keyword,
run_dir=run_dir,
max_review_loops=MAX_REVIEW_LOOPS,
)
dispatcher = Dispatcher(
state=state,
event_bus=event_bus,
event_queue=event_queue,
)
# 最初のイベントを発行し、Event Queue に積む
start_event = event_bus.emit(
events.PROCESS_STARTED,
status="OK",
payload={
"keyword": keyword,
"run_dir": run_dir,
"max_review_loops": MAX_REVIEW_LOOPS,
"mode": "pseudo_async_event_queue",
},
)
event_queue.push(start_event)
try:
run_dispatcher_loop(
dispatcher=dispatcher,
event_queue=event_queue,
event_bus=event_bus,
run_dir=run_dir,
)
# 念のため、最後にもイベントログを保存する
save_event_log(run_dir, event_bus)
print("\n完了しました。")
print(f"保存先: {state.final_article_path}")
print(f"実行結果フォルダ: {run_dir}")
print(f"イベントログ: {run_dir}/event_log.json")
print(f"Reviewer最終JSON判定:{state.final_review_status}")
if state.review_passed:
print("保存状態:Reviewer AI のJSON判定とPython側HTMLチェックのOK後に保存しました。")
else:
print("保存状態:最大レビュー回数到達後に最後の記事を保存しました。")
except Exception as error:
error_event = event_bus.emit(
events.PROCESS_FAILED,
status="ERROR",
payload={
"message": str(error),
},
)
event_queue.push(error_event)
save_event_log(run_dir, event_bus)
print("\nエラーが発生しました。")
print(str(error))
print(f"イベントログ: {run_dir}/event_log.json")
if __name__ == "__main__":
main()main.py で変わる点
第7回までの main.py は、次の処理判断をほぼすべて持っていました。
Plannerを実行する
Researcherを実行する
Writerを実行する
Reviewer結果を見る
OKならValidatorへ進む
修正必要ならWriterへ戻す
保存する第8回では、この判断を Dispatcher に移します。
main.py
↓
最初のイベント process.started を発行
↓
EventQueue に積む
↓
Dispatcher が event / status を見て処理を振り分けるつまり main.py は、処理本体ではなく起動役に近づきます。
第8回AI記事作成の実行結果
実際に以下のキーワードで実行しました。
2026年時点の最新AIモデル実行中、1回目のReviewer AIでは判定JSONが見つからず、UNKNOWN になりました。
そのため、Dispatcherは安全側に判断し、Writer AIへ修正を依頼しました。
その後、2回目のレビューでは REVISION_REQUIRED が返り、さらに修正が行われました。
3回目のレビューでは、次のようなJSONが返りました。
{
"status": "OK",
"summary": "構成、論理性、網羅性、読みやすさのすべてが高レベルで保たれており、プロフェッショナルな記事として即座に公開可能です。",
"issues": [
"最後のCTA要素がメインコンテンツのタグ構造の外側に独立している点。構造的な連続性を保つため、HTMLの囲みタグでラップするか、メインコンテンツの最後尾に統合すると、より完成度が高まります。"
],
"next_action": "SAVE"
}その後、Python側HTMLチェックもOKとなり、最終記事が保存されました。
Reviewer AI JSON判定 + Python側HTMLチェック の判定:OK
[8] 最終HTMLを保存中...
完了しました。
保存先: output/20260520_034447_2026年時点の最新AIモデル/final_article.html
保存状態:Reviewer AI のJSON判定とPython側HTMLチェックのOK後に保存しました。
Reviewer最終JSON判定:OK
イベントログ: output/20260520_034447_2026年時点の最新AIモデル/event_log.jsonこの結果から、Dispatcherによる分岐処理が機能していることを確認できます。
ターミナル出力
(UTF-8)
(SHIFT-JIS)
出力記事
(UTF-8)
(SHIFT-JIS)
event_log.json
(UTF-8)
(SHIFT-JIS)
第8回AI記事作成で保存されるファイル
実行後、output/日時_キーワード/ 配下に以下のファイルが保存されます。
plan.txt
research_query.txt
web_search_results.json
research_note.txt
draft_1.html
review_1.txt
review_result_1.json
draft_2.html
review_2.txt
review_result_2.json
draft_3.html
review_3.txt
review_result_3.json
validation_3.txt
final_article.html
run_summary.txt
event_log.json特に重要なのは、次の3つです。
| ファイル | 確認内容 |
|---|---|
event_log.json | イベントが期待通りに流れたか |
review_result_*.json | Reviewer AIのJSON判定が取得できたか |
final_article.html | 最終記事が保存されたか |
event_log.json を見ることで、処理がどの順番で進んだか、どこで修正ループに入ったかを確認できます。
第9回AI記事作成へ向けて
第8回では、Event Queueを導入しましたが、処理はまだ同期的に進みます。
つまり、1つのイベントを取り出し、処理が終わったら次のイベントへ進む形です。
第9回AI記事作成では、この構成をさらに発展させ以下のような要素を追加する予定です。
asyncio.Queue
非同期Dispatcher
Worker
リトライ制御
タイムアウト制御
失敗イベントの再投入
第8回AI記事作成で作った EventQueue、Dispatcher、Handler の考え方は、そのまま第9回の非同期化につながります。
第8回AI記事作成は、本格的な非同期イベント駆動型へ進むための重要な土台といえます。
まとめ:main.py中心からDispatcher中心へ
第8回では、AI記事作成システムをイベントキュー型マルチエージェントへ発展させました。
これまでの構成では、main.py が処理の中心でした。
main.py がPlannerを呼ぶ
main.py がResearcherを呼ぶ
main.py がWriterを呼ぶ
main.py がReviewer結果を判定する
main.py が保存する第8回では、これを次の形へ変更しました。
main.py は最初のイベントを発行する
EventQueue がイベントを保持する
Dispatcher がイベントを見て処理を判断する
Handler が実際の処理を実行するこの変更により、AIエージェントごとの役割が整理され、修正ループやエラー処理もイベントとして扱いやすくなりました。
第8回AI記事作成は、まだ完全な非同期処理ではありません。
しかし、イベント駆動型マルチエージェントの考え方を実装として理解するには、非常に重要なステップです。
次回は、この構成を asyncio.Queue とWorkerへ発展させ、本格的な非同期イベント駆動型へ進めます。
予定しているテーマは次の通りです。
【第9回】AI記事作成を非同期イベント駆動型へ発展させる
|asyncio.QueueとWorkerでマルチエージェントを並行制御する
