← Back to Plugins
Tools

Message Bus

htt501 By htt501 👁 3 views ▲ 0 votes

A2A Message Bus - Agent-to-Agent async message bus plugin for OpenClaw

GitHub

Install

npm install
```

Configuration Example

{
  "plugins": {
    "entries": {
      "openclaw-message-bus": {
        "enabled": true,
        "config": {
          "agents": ["main", "ops", "creator", "intel", "strategist"],
          "notify": {
            "enabled": true,
            "timeoutSeconds": 120,
            "replyChannel": "feishu",
            "replyTo": "chat:oc_YOUR_GROUP_CHAT_ID"
          }
        }
      }
    },
    "installs": {
      "openclaw-message-bus": {
        "source": "path",
        "installPath": "~/.openclaw/extensions/openclaw-message-bus",
        "version": "1.0.0",
        "spec": "~/.openclaw/extensions/openclaw-message-bus"
      }
    },
    "allow": ["openclaw-message-bus"]
  }
}

README

# openclaw-message-bus

Agent-to-Agent (A2A) async message bus plugin for [OpenClaw](https://github.com/nicktao/openclaw).

SQLite-backed message queue with priority routing, thread tracking, auto-retry, dead-letter handling, and CLI push notifications.

## Features

- **4 tools**: `bus_send`, `bus_read`, `bus_ack`, `bus_status`
- **SQLite persistence** with WAL mode for concurrent access
- **Priority queuing**: P0 (urgent) โ†’ P3 (low)
- **Thread tracking**: automatic conversation threading with configurable round limit (default 10)
- **Atomic read**: `UPDATE...RETURNING` ensures no duplicate delivery under concurrency
- **Auto-retry & dead-letter**: processing timeout โ†’ retry up to 3x โ†’ dead_letter
- **Cron jobs**: timeout revert (5min), fallback recovery (5min), expiry cleanup (1h), metrics logging (1h)
- **Fallback**: SQLite write failure โ†’ `/tmp/bus-fallback/` JSON files โ†’ auto-recovered by cron
- **CLI push notification**: fire-and-forget `openclaw agent` call to wake target agent

## Quick Start

### 1. Install

```bash
cd ~/.openclaw/extensions
git clone https://github.com/nicktao/openclaw-message-bus.git
cd openclaw-message-bus
npm install
```

### 2. Register plugin

Add to `~/.openclaw/openclaw.json` under `plugins.entries`:

```json
{
  "plugins": {
    "entries": {
      "openclaw-message-bus": {
        "enabled": true,
        "config": {
          "agents": ["main", "ops", "creator", "intel", "strategist"],
          "notify": {
            "enabled": true,
            "timeoutSeconds": 120,
            "replyChannel": "feishu",
            "replyTo": "chat:oc_YOUR_GROUP_CHAT_ID"
          }
        }
      }
    },
    "installs": {
      "openclaw-message-bus": {
        "source": "path",
        "installPath": "~/.openclaw/extensions/openclaw-message-bus",
        "version": "1.0.0",
        "spec": "~/.openclaw/extensions/openclaw-message-bus"
      }
    },
    "allow": ["openclaw-message-bus"]
  }
}
```

### 3. Restart gateway

```bash
launchctl stop ai.openclaw.gateway
launchctl start ai.openclaw.gateway
```

### 4. Verify

Check logs for `message-bus initialized`:
```bash
tail -f ~/.openclaw/logs/gateway.err.log
```

## Configuration

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `agents` | `string[]` | `[]` | Valid agent IDs. Empty = accept any agent |
| `notify.enabled` | `boolean` | `true` | Enable CLI push notification on send |
| `notify.timeoutSeconds` | `number` | `120` | CLI notification timeout |
| `notify.replyChannel` | `string` | `""` | Channel hint in notification (e.g. `feishu`) |
| `notify.replyTo` | `string` | `""` | Destination hint (e.g. group chat ID) |

## Tools

### bus_send โ€” Send a message

```js
bus_send({
  to: "ops",                    // target agent ID
  content: "Check disk space",  // message body (max 10KB)
  type: "request",              // task | discuss | notify | request | response | escalation
  priority: "P1",               // P0 | P1 | P2 | P3 (default P2)
  ref: "task-123",              // optional: correlation ID
  reply_to: "msg_main_xxx"     // optional: reply to a specific message
})
```

### bus_read โ€” Read pending messages

```js
bus_read({
  from: "main",    // optional: filter by sender
  type: "request", // optional: filter by type
  limit: 10        // optional: max messages (default 10)
})
// Reads messages addressed to the calling agent, sorted by priority then time
// Messages are atomically marked as 'delivered' on read
```

### bus_ack โ€” Acknowledge a message

```js
bus_ack({ msg_id: "msg_main_1234_00a1" })
// Idempotent: already-acked messages return ALREADY_ACKED
```

### bus_status โ€” Query message status

```js
bus_status({ msg_id: "msg_main_1234_00a1" })
// Returns full message record including status, timestamps, retry count
```

## Message Lifecycle

```
queued โ†’ delivered (on bus_read)
queued โ†’ expired (after 24h)
processing โ†’ queued (timeout retry, up to 3x)
processing โ†’ dead_letter (max retries exceeded)
dead_letter โ†’ expired (after 24h)
delivered โ†’ deleted (after 7 days)
```

## Agent Rules Template

Copy this to each agent's rules directory (e.g. `~/.openclaw/workspace-{agent}/rules/message-bus.md`):

```markdown
# Message Bus Rules

1. **On activation, always call bus_read first** โ€” check for pending messages
2. **After processing a message, call bus_ack** โ€” prevents duplicate delivery
3. **Use bus_send for agent-to-agent communication** โ€” not direct session messages
4. **Use appropriate priority** โ€” P0/P1 for urgent, P2 for normal, P3 for low
5. **Use reply_to when responding** โ€” maintains thread tracking
6. **Thread limit is 10 rounds** โ€” escalate if you hit ROUND_LIMIT
```

## Project Structure

```
openclaw-message-bus/
โ”œโ”€โ”€ index.js                    # Plugin entry point
โ”œโ”€โ”€ openclaw.plugin.json        # Plugin manifest & config schema
โ”œโ”€โ”€ package.json
โ”œโ”€โ”€ src/
โ”‚   โ”œโ”€โ”€ schema.js               # TypeBox schemas & constants
โ”‚   โ”œโ”€โ”€ db.js                   # SQLite layer (better-sqlite3)
โ”‚   โ”œโ”€โ”€ id.js                   # Message ID generator
โ”‚   โ”œโ”€โ”€ format.js               # Response formatting
โ”‚   โ”œโ”€โ”€ fallback.js             # /tmp fallback read/write
โ”‚   โ”œโ”€โ”€ cron.js                 # Scheduled jobs
โ”‚   โ””โ”€โ”€ tools/
โ”‚       โ”œโ”€โ”€ bus_send.js         # Send message tool
โ”‚       โ”œโ”€โ”€ bus_read.js         # Read messages tool
โ”‚       โ”œโ”€โ”€ bus_ack.js          # Acknowledge tool
โ”‚       โ””โ”€โ”€ bus_status.js       # Status query tool
โ””โ”€โ”€ tests/
    โ”œโ”€โ”€ unit/                   # Unit tests
    โ”œโ”€โ”€ integration/            # Integration tests
    โ””โ”€โ”€ property/               # Property-based tests
```

## Development

```bash
npm test                    # unit + property tests
npm run test:integration    # integration tests
```

## License

MIT
tools

Comments

Sign in to leave a comment

Loading comments...