API docs WS Realtime

Guide

WebSocket (WS) guide for agents and operators

Robots Center now exposes a realtime transport on /socket. Use it to stream command delivery, trace lifecycle updates, replay status, and approval queue changes without polling.

Step 1

Authenticate over HTTP and mint a short-lived socket token with /api/v1/socket_tokens.

Step 2

Connect your Phoenix client to /socket and pass socket_token in the params.

Step 3

Join a scoped topic, then exchange lifecycle events such as command.dispatch, robot.status_change, and mission.status_update.

Companion docs

Access model

Operator browser sessions can join workspace, approval, and fleet topics. Service-agent socket tokens can join agent, trace, and replay topics subject to the granted scopes.

HTTP bootstrap

Mint a socket token

POST /api/v1/socket_tokens
POST /api/v1/socket_tokens
Authorization: Bearer {access_token}
Content-Type: application/json

Response 200
{
  "socket_token": "SFMyNTY...",
  "expires_in": 600,
  "workspace_id": "e65ef764-9b2c-4e24-b918-99c7be33506a",
  "service_agent_id": "a91720d1-1c45-4343-bb45-786e20432f04",
  "scopes": [
    "sockets:connect",
    "events:read",
    "agent_commands:read",
    "agent_commands:write"
  ],
  "socket_path": "/socket"
}

Phoenix client

Connect and join

/socket
import {Socket} from "phoenix"

const socket = new Socket("/socket", {
  params: {socket_token}
})

socket.connect()

const channel = socket.channel(`agent:${serviceAgentId}`, {
  version: "1.0.0",
  capabilities: ["deploy.workflow"]
})

channel.join()
channel.push("agent.ready", {version: "1.0.0"})

channel.on("command.dispatch", envelope => {
  const commandId = envelope.data.id

  channel.push("command.accepted", {command_id: commandId})

  // Execute the command, then report completion or failure.
  channel.push("command.complete", {
    command_id: commandId,
    result_payload: {status: "ok"}
  })
})

Topics

Channel map

Topic Audience Purpose
agent:{service_agent_id} Service agent Command delivery, command lifecycle updates, readiness, and heartbeats.
trace:{trace_id} Operator or service agent Trace creation, event append, updates, and finalization.
replay:{replay_id} Operator or service agent Replay start, progress, completion, failure, and stuck detection.
workspace:{workspace_id} Operator Workspace-level summaries and shared lifecycle events.
approvals:{workspace_id} Operator Approval queue creation and decision updates.
fleet:{workspace_id} Operator Workspace-wide fleet events including robot status changes, mission lifecycle updates, fleet alert notifications, and diagnostic recordings.
fleet:robots:{robot_id} Operator Per-robot events including heartbeats, status changes, diagnostics, and OTA update status changes.

Scopes

What credentials need

sockets:connect

Required to mint and use a service-agent socket token.

events:read

Required for service agents joining trace and replay topics.

agent_commands:read

Required for a service agent to join its own agent:{service_agent_id} topic.

agent_commands:write

Required to publish command.accepted, command.progress, command.complete, and command.fail.

Client to server

Events sent by connected agents

agent.ready

Declare capabilities and trigger queued command dispatch.

Field Description
version string -- agent software version
capabilities array<string> -- declared capability identifiers (e.g., ["deploy.workflow"])

Reply: %{status: "ready"}

agent.heartbeat

Refresh agent liveness metadata without rejoining.

Field Description
metadata object -- arbitrary liveness metadata (e.g., %{load: 0.7, uptime_seconds: 3600})

Reply: %{status: "heartbeat_received"}

command.accepted

Acknowledge that the service agent has accepted work.

Field Description
result_payload object -- optional initial result data
command_id string (UUID) -- required, the ID of the dispatched command

Reply: %{"command_id" => uuid, "status" => "accepted"}

command.progress

Publish incremental execution status or partial results.

Field Description
result_payload object -- incremental result data merged with previous progress
command_id string (UUID) -- required, the ID of the in-progress command

Reply: %{"command_id" => uuid, "status" => "running"}

command.complete

Mark a command as succeeded or cancelled with a final result payload.

Field Description
status string -- optional, "cancelled" to mark as cancelled (defaults to succeeded)
result_payload object -- final result data
command_id string (UUID) -- required, the ID of the completed command

Reply: %{"command_id" => uuid, "status" => "succeeded" | "cancelled"}

command.fail

Mark a command as failed or cancelled with an error payload.

Field Description
status string -- optional, "cancelled" to mark as cancelled (defaults to failed)
error_payload object -- error details, e.g. %{"code" => "timeout", "message" => "..."}
command_id string (UUID) -- required, the ID of the failed command

Reply: %{"command_id" => uuid, "status" => "failed" | "cancelled"}

Server to client

Events emitted by the platform

command.dispatch

Delivered to an agent topic when a queued command is leased for execution.

Field Description
id UUID -- command ID
status "dispatched"
payload object -- original command payload from the operator
command_type string -- application-defined command type (e.g., "deploy.workflow")
correlation_id string -- idempotency and tracing key
created_by_user %{id, email} -- operator who created the command
lease_expires_at ISO 8601 -- when the dispatch lease expires (60 seconds from dispatch)
service_agent %{id, name, slug} -- target agent summary

command.cancel

Broadcast when an operator cancels queued, dispatched, accepted, or running work.

Field Description
id UUID -- command ID
status "cancelled"
cancel_requested_at ISO 8601 -- when the cancel was requested
correlation_id string
error_payload %{code: "cancelled", message: "..."} -- cancellation reason

command.timed_out

Broadcast when accepted work loses the agent connection before completion.

Field Description
id UUID -- command ID
status "timed_out"
completed_at ISO 8601
correlation_id string
error_payload %{code: "agent_disconnected", message: "The agent disconnected before the command completed"}

trace.created

Emitted when a new trace is ingested.

Field Description
name string -- human-readable trace name
status "running" | "ok" | "error" | "partial"
workspace_id UUID
trace_id UUID -- trace identifier
trace_type "runtime" | "eval" | "replay"

trace.event.appended

Emitted when an event is appended to an existing trace.

Field Description
workspace_id UUID
trace_id UUID -- parent trace identifier
event_ids array<UUID> -- IDs of the appended events

trace.finalized

Emitted when a trace reaches a terminal status.

Field Description
status "ok" | "error" | "partial"
workspace_id UUID
trace_id UUID

replay.started

Emitted when a replay begins execution.

Field Description
workspace_id UUID
replay_id UUID

replay.updated

Emitted when a replay reports progress.

Field Description
workspace_id UUID
replay_id UUID

replay.completed

Emitted when a replay finishes successfully.

Field Description
workspace_id UUID
replay_id UUID

replay.failed

Emitted when a replay fails.

Field Description
workspace_id UUID
replay_id UUID

approval_request.created

Emitted when a new approval request enters the queue.

Field Description
request object -- approval request details
workspace_id UUID

approval_request.decided

Emitted when an approval request is approved or rejected.

Field Description
request object -- updated approval request with decision
workspace_id UUID

agent.ready

Presence event reflected back through the canonical event bus.

Field Description
metadata object -- capabilities, version, and other join payload data
service_agent_id UUID

agent.heartbeat

Liveness event reflected back through the canonical event bus.

Field Description
metadata object -- heartbeat payload data
service_agent_id UUID

robot.heartbeat

Emitted on fleet:{workspace_id} and fleet:robots:{robot_id} when a robot sends a heartbeat with updated battery, location, or status.

Field Description
status "online" | "offline" | "charging" | "error" | "maintenance"
location object -- %{lat, lng, zone}
robot_id UUID
workspace_id UUID
battery_level integer (0-100) -- current battery percentage

robot.status_change

Emitted on fleet:{workspace_id} and fleet:robots:{robot_id} when a robot transitions between online, offline, charging, error, or maintenance status.

Field Description
robot_id UUID
workspace_id UUID
new_status "online" | "offline" | "charging" | "error" | "maintenance"
previous_status string -- status before transition

mission.status_update

Emitted on fleet:{workspace_id} when a mission transitions between lifecycle states.

Field Description
robot_id UUID | nil
workspace_id UUID
mission_id UUID
new_status "pending" | "assigned" | "in_progress" | "paused" | "completed" | "cancelled" | "failed"
previous_status string -- status before transition

fleet_alert.created

Emitted on fleet:{workspace_id} when a new fleet alert is generated.

Field Description
message string
title string
severity "info" | "warning" | "error" | "critical"
robot_id UUID | nil
workspace_id UUID
alert_type "battery_low" | "offline" | "error" | "maintenance_due" | "geofence_breach"
alert_id UUID

fleet_alert.acknowledged

Emitted on fleet:{workspace_id} when an operator acknowledges an active fleet alert.

Field Description
acknowledged_at ISO 8601
acknowledged_by UUID -- operator user ID
workspace_id UUID
alert_id UUID

fleet_alert.resolved

Emitted on fleet:{workspace_id} when an active or acknowledged fleet alert is resolved.

Field Description
resolved_at ISO 8601
resolved_by UUID -- operator user ID
workspace_id UUID
alert_id UUID

diagnostic.recorded

Emitted on fleet:robots:{robot_id} when a new diagnostic metric reading is recorded for a robot.

Field Description
status "normal" | "warning" | "critical"
unit string -- e.g., "percent", "celsius"
metric_name string -- e.g., "battery_health", "motor_temperature"
metric_value number
robot_id UUID
workspace_id UUID
diagnostic_id UUID

ota_update.status_change

Emitted on fleet:robots:{robot_id} when an OTA update transitions between status values.

Field Description
update_type "firmware" | "software" | "config" | "security_patch"
robot_id UUID
workspace_id UUID
firmware_version string -- target version
new_status "pending" | "downloading" | "installing" | "completed" | "failed" | "rolled_back"
ota_update_id UUID
previous_status string

Limits and transport

Payload and connection constraints

max_frame_size

The /socket transport enforces a maximum WebSocket frame size of 65 536 bytes (64 KB). Frames that exceed this limit are rejected by the server before reaching any channel handler.

Payload size limit

Every handle_in callback in the agent channel validates the serialized payload size against the same 64 KB ceiling. Payloads exceeding this limit receive a payload_too_large error reply.

check_origin (production)

In production the endpoint enforces check_origin: [host, "https://#{host}"]. WebSocket upgrade requests from mismatched origins are rejected at the transport layer.

Error replies

Channel error responses

All error replies follow the shape %{reason: String.t()}. The table below covers every reason the agent channel may return.

Reason Description
payload_too_large Returned when a channel event payload exceeds the 64 KB (65 536 byte) limit. Applies to all client-to-server events.
unknown_event Returned when the agent channel receives an event name that is not recognized. Check the event name against the documented client events.
unauthorized Returned when the socket token or session lacks the required scopes for the requested channel topic.
insufficient_scope Returned when the socket token is valid but does not include agent_commands:write, required for command.accepted, command.progress, command.complete, and command.fail.
invalid_id Returned when a command event payload does not contain a valid command_id or id field.
invalid_payload Returned when the payload is not a map (e.g., a string or list).

Connection lifecycle

Connect, join, and disconnect

Socket identity

Service-agent sockets are identified by credential_id ("service_agent_socket:#{credential_id}"). Operator sessions use a SHA-256 hash of the session token so the raw token never appears in the process identifier.

Presence tracking

On join, the agent channel tracks presence via Presence.track/3. Subsequent agent.ready and agent.heartbeat calls update presence metadata.

Disconnect cleanup

When a socket disconnects, terminate/2 calls Presence.untrack/2 to remove the presence entry before invoking handle_agent_disconnect, ensuring downstream consumers see an accurate presence list.

Scope guards

Topic join authorization (can_join_trace?, can_join_replay?) safely handles nil scope assignments, returning false rather than raising. This prevents crashes when a socket connects without a fully populated scope.