トップページ

api

REST API / MQTT リファレンス

JWT認証の流れから PATCH 実装、MQTT 購読例まで、初めての方にも分かりやすくまとめました。実際の運用で役立つエラーハンドリングの方法も解説します。

認証と基本リクエスト

  1. POST /api/login にユーザー名/パスワードを送信し、JWTを取得。
  2. 以降のRESTリクエストでは Authorization: Bearer <token> を付与。
  3. トークンは1時間有効。自動スクリプトは期限切れ前にリフレッシュするか再ログイン。
curl -X POST https://localhost:8443/api/login \\
  -H \"Content-Type: application/x-www-form-urlencoded\" \\
  -d 'username=admin&password=admin'

主要RESTエンドポイント

  • GET /api/flows, POST /api/flows, PATCH /api/flows/{id}
  • GET /api/address/buckets/overview, POST /api/address/buckets/child
  • GET /api/checker/latest, POST /api/automation/jobs/{id}/enable
  • GET /api/logs?kind=api|audit

MQTTトピック

  • mmam/flows/events — Flow作成/更新/削除(/all/flow/{id} にブロードキャスト)。
// MQTT.js例
client.subscribe('mmam/flows/events');
client.on('message', (_, payload) => {
  const event = JSON.parse(payload.toString());
  console.log(event.flow_id, event.event);
});

実践シナリオ: 認証からフロー更新まで

外部スクリプトから MMAM API を呼び出して、フロー情報を更新する一連の流れを実装します。 このシナリオでは、Python を使って「認証 → 検索 → 更新 → 確認」の4ステップを実行します。

ステップ1: JWT トークンを取得

まず、ログインして認証トークンを取得します。

import requests

API_BASE = "https://localhost:8443"

# ログインしてトークン取得
response = requests.post(
    f"{API_BASE}/login",
    json={"username": "admin", "password": "admin"},
    verify=False  # 自己署名証明書の場合
)
token = response.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}

ステップ2: 条件でフローを検索

Source IP と Multicast IP を指定してフローを検索します。

# フロー検索
params = {
    "source_addr_a": "192.168.10.101",
    "multicast_addr_a": "239.100.1.10"
}
response = requests.get(
    f"{API_BASE}/api/flows",
    headers=headers,
    params=params,
    verify=False
)
flows = response.json()
if not flows:
    print("フローが見つかりません")
    exit(1)

flow_id = flows[0]["flow_id"]
print(f"対象フロー: {flow_id}")

ステップ3: PATCH で部分更新

取得した flow_id を使って、必要なフィールドだけを更新します。

# フロー情報を部分更新
update_data = {
    "alias1": "Studio A Main Camera (Updated)",
    "user_field7": "Operator: Sarah Martinez",
    "user_field8": "Emergency: ext.1234"
}
response = requests.patch(
    f"{API_BASE}/api/flows/{flow_id}",
    headers=headers,
    json=update_data,
    verify=False
)

if response.status_code == 200:
    print("更新成功")
elif response.status_code == 423:
    print("エラー: フローがロックされています")
else:
    print(f"エラー: {response.status_code} - {response.text}")

ステップ4: 更新内容を確認

最後に GET で取得し、更新が反映されているか確認します。

# 更新後のフロー情報を取得
response = requests.get(
    f"{API_BASE}/api/flows/{flow_id}",
    headers=headers,
    verify=False
)
flow = response.json()
print(f"alias1: {flow['alias1']}")
print(f"user_field7: {flow['user_field7']}")
print(f"user_field8: {flow['user_field8']}")

MQTT リアルタイム購読の実装例

MQTT を使うと、フローの作成・更新・削除がリアルタイムで通知されます。 外部の監視システムや通知システムと連携する際に便利です。

Node.js での MQTT 購読例

const mqtt = require('mqtt');

// MQTT ブローカーに接続
const client = mqtt.connect('mqtt://localhost:1883', {
  username: 'mmam',
  password: 'your-mqtt-password'
});

client.on('connect', () => {
  console.log('MQTT接続成功');

  // フローイベントを購読(作成/更新/削除)
  client.subscribe('mmam/flows/events', (err) => {
    if (!err) console.log('購読開始: mmam/flows/events');
  });
});

client.on('message', (topic, payload) => {
  const event = JSON.parse(payload.toString());

  if (topic === 'mmam/flows/events') {
    console.log(`フローイベント: ${event.event}`);
    console.log(`  flow_id: ${event.flow_id}`);
    console.log(`  display_name: ${event.flow?.display_name}`);

    // Slackへ通知する例
    if (event.event === 'created') {
      notifySlack(`新しいフローが作成されました: ${event.flow?.display_name}`);
    }
  }
});

Python での MQTT 購読例

import paho.mqtt.client as mqtt
import json

def on_connect(client, userdata, flags, rc):
    print(f"MQTT接続: {rc}")
    client.subscribe("mmam/flows/events")

def on_message(client, userdata, msg):
    event = json.loads(msg.payload.decode())

    if msg.topic == "mmam/flows/events":
        print(f"フローイベント: {event['event']}")
        print(f"  flow_id: {event['flow_id']}")
        # データベースに記録する例
        log_to_database(event)

client = mqtt.Client()
client.username_pw_set("mmam", "your-mqtt-password")
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.loop_forever()

エラーハンドリングとベストプラクティス

よくあるエラーコード

  • 401 Unauthorized: トークン期限切れ → 再ログイン
  • 404 Not Found: flow_id が存在しない
  • 423 Locked: フローがロック中 → 解除を待つ
  • 422 Unprocessable Entity: バリデーションエラー

主要RESTエンドポイント

  • POST /login — JWT認証
  • GET /api/flows — フロー検索
  • POST /api/flows — フロー作成
  • PATCH /api/flows/{id} — フロー更新
  • DELETE /api/flows/{id} — フロー削除
  • GET /api/address/buckets/overview — アドレスマップ取得
  • POST /api/address/buckets/child — 子バケット作成
  • GET /api/checker/latest — 最新の検証結果
  • GET /api/logs — ログ取得

システム連携のヒント

  • 外部システムのキーと flow_id を紐付けて管理
  • MQTT イベントを Webhook へ転送して Issue Tracker と連携
  • GET /api/flows/export で全データを取得し Git で管理
  • Checker の自動実行結果をメール/Slack に通知
  • CI/CD で POST /api/flows/import を使って環境同期