Back

How to Run Agentic Workflows With WebSockets

May 29, 2026
How to Run Agentic Workflows With WebSockets

How to Run Agentic Workflows With WebSockets

Agentic workflows often take longer than a normal HTTP request. An agent may plan, call tools, wait for search results, run an evaluator, retry a failed step, stream tokens, ask for approval, and return a final answer. If your UI waits for one response at the end, users see a spinner and your engineering team loses visibility into what happened.

WebSockets work well when you need one persistent connection that sends ordered workflow events back to the client as the agent runs. The key is to treat the WebSocket as an event transport, not as an unstructured log pipe.

In this tutorial, you will build a practical WebSocket pattern for agentic workflows with:

  • One persistent authenticated connection per workflow run
  • Ordered events with sequence numbers
  • Client-side UI updates for planning, tool calls, token streaming, and completion
  • Reconnect and resume support after disconnects
  • Backpressure handling
  • Debug traces for failed agent steps

When to Use WebSockets for Agentic Workflows

Use WebSockets when the server needs to push multiple updates to the client during one long-running workflow.

Good fits include:

  • An AI research agent that streams status updates while searching, reading, and summarizing sources
  • A coding agent that reports file reads, edits, tests, and command output
  • A support agent that calls tools and shows intermediate decisions before drafting a reply
  • A workflow builder that shows each prompt, model call, tool call, and evaluator result in order

Do not use WebSockets for every AI endpoint. For simple request and response tasks, regular HTTP is easier to secure, cache, test, and operate. A prompt that classifies one ticket, extracts JSON, or rewrites one paragraph usually does not need a persistent connection.

A reliable WebSocket setup for agent workflows usually has four parts:

  1. Client UI: Opens the socket, sends a start message, renders ordered events, and reconnects when needed.
  2. WebSocket gateway: Authenticates the user, validates messages, tracks connection state, and sends events.
  3. Workflow runner: Runs the agent steps, tool calls, LLM calls, evals, and retries.
  4. Event store: Persists workflow events so the client can recover after disconnects.
Screenshot of a WebSocket connection lifecycle for an agentic workflow showing connect, authenticate, start workflow, stream events, reconnect, resume, and complete
Screenshot: connection lifecycle for a persistent WebSocket workflow run.

The most important design choice is the event contract. If the server sends loose strings such as "Searching..." or "Calling tool", your client, tests, traces, and replay tools will become fragile. Send structured events instead.

Define a Structured Event Envelope

Every event should include enough information to order, resume, trace, and render it.

{
  "workflow_id": "wf_123",
  "run_id": "run_456",
  "seq": 12,
  "type": "tool.result",
  "ts": "2026-05-29T14:21:32.118Z",
  "trace_id": "trace_abc",
  "parent_event_id": "evt_011",
  "event_id": "evt_012",
  "payload": {
    "tool_name": "search_docs",
    "status": "success",
    "duration_ms": 842,
    "result_preview": "Found 8 matching documents"
  }
}

Use these fields consistently:

  • workflow_id: Stable ID for the workflow definition, such as customer_support_triage_v3.
  • run_id: Unique ID for one execution.
  • seq: Monotonic integer for ordering events within one run.
  • type: Machine-readable event type, such as agent.plan, llm.token, or tool.error.
  • trace_id: ID used to inspect the full run in your tracing system.
  • parent_event_id: Optional ID that links a child event to a parent step.
  • payload: Type-specific data.

Useful Event Types

Start with a small set. You can add more after your UI and tracing needs become clear.

workflow.started
workflow.completed
workflow.failed

agent.plan
agent.step.started
agent.step.completed
agent.step.failed

llm.request
llm.token
llm.response
llm.error

tool.request
tool.result
tool.error

approval.required
approval.received

debug.trace
heartbeat

Keep token streaming separate from workflow status. A token stream is high volume and UI-facing. Workflow events are lower volume and operational. If you mix them together without clear types, debugging gets harder.

Build the WebSocket Server

The example below uses FastAPI. It accepts a WebSocket connection, authenticates the user, starts an agent workflow, emits ordered events, and supports resume with last_seq.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from datetime import datetime, timezone
import asyncio
import json
import uuid

app = FastAPI()

# Demo in-memory event store.
# Use Redis, Postgres, or a durable queue in production.
EVENTS_BY_RUN = {}

def now_iso():
    return datetime.now(timezone.utc).isoformat()

async def authenticate(token: str):
    if not token or token != "dev-token":
        return None
    return {"user_id": "user_123", "org_id": "org_456"}

class EventWriter:
    def __init__(self, websocket: WebSocket, workflow_id: str, run_id: str):
        self.websocket = websocket
        self.workflow_id = workflow_id
        self.run_id = run_id
        self.seq = len(EVENTS_BY_RUN.get(run_id, []))

    async def emit(self, event_type: str, payload: dict, parent_event_id: str | None = None):
        self.seq += 1
        event = {
            "workflow_id": self.workflow_id,
            "run_id": self.run_id,
            "seq": self.seq,
            "type": event_type,
            "ts": now_iso(),
            "trace_id": f"trace_{self.run_id}",
            "parent_event_id": parent_event_id,
            "event_id": f"evt_{self.seq:06d}",
            "payload": payload,
        }

        EVENTS_BY_RUN.setdefault(self.run_id, []).append(event)
        await self.websocket.send_text(json.dumps(event))

async def replay_events(websocket: WebSocket, run_id: str, last_seq: int):
    events = EVENTS_BY_RUN.get(run_id, [])
    for event in events:
        if event["seq"] > last_seq:
            await websocket.send_text(json.dumps(event))

async def run_agent_workflow(writer: EventWriter, user_message: str):
    await writer.emit("workflow.started", {
        "message": user_message
    })

    await writer.emit("agent.plan", {
        "steps": [
            "Classify the request",
            "Search internal docs",
            "Draft answer",
            "Run quality check"
        ]
    })

    await writer.emit("agent.step.started", {
        "step_name": "classify_request"
    })

    await asyncio.sleep(0.3)

    await writer.emit("llm.request", {
        "model": "gpt-4.1-mini",
        "prompt_name": "classify_support_request",
        "prompt_version": 7
    })

    await asyncio.sleep(0.5)

    await writer.emit("llm.response", {
        "status": "success",
        "latency_ms": 512,
        "output": {
            "category": "billing",
            "priority": "medium"
        }
    })

    await writer.emit("agent.step.completed", {
        "step_name": "classify_request"
    })

    await writer.emit("tool.request", {
        "tool_name": "search_docs",
        "args": {
            "query": "billing refund policy"
        }
    })

    await asyncio.sleep(0.8)

    await writer.emit("tool.result", {
        "tool_name": "search_docs",
        "status": "success",
        "duration_ms": 804,
        "result_count": 4
    })

    await writer.emit("agent.step.started", {
        "step_name": "draft_answer"
    })

    await writer.emit("llm.request", {
        "model": "gpt-4.1",
        "prompt_name": "draft_support_reply",
        "prompt_version": 12
    })

    answer = "You can request a refund within 30 days if the charge meets the refund policy."
    for token in answer.split(" "):
        await writer.emit("llm.token", {
            "text": token + " "
        })
        await asyncio.sleep(0.05)

    await writer.emit("llm.response", {
        "status": "success",
        "latency_ms": 1394
    })

    await writer.emit("workflow.completed", {
        "status": "success"
    })

@app.websocket("/ws/workflows/{workflow_id}")
async def workflow_socket(
    websocket: WebSocket,
    workflow_id: str,
    token: str = Query(default=""),
    run_id: str | None = Query(default=None),
    last_seq: int = Query(default=0)
):
    user = await authenticate(token)
    if not user:
        await websocket.close(code=1008, reason="Unauthorized")
        return

    await websocket.accept()

    if not run_id:
        run_id = f"run_{uuid.uuid4().hex}"

    await replay_events(websocket, run_id, last_seq)

    writer = EventWriter(websocket, workflow_id, run_id)

    try:
        raw_message = await websocket.receive_text()
        message = json.loads(raw_message)

        if message.get("type") != "workflow.start":
            await writer.emit("workflow.failed", {
                "error": "First message must be workflow.start"
            })
            await websocket.close(code=1003)
            return

        user_message = message["payload"]["message"]
        await run_agent_workflow(writer, user_message)

    except WebSocketDisconnect:
        print(f"Client disconnected from run {run_id}")
    except Exception as exc:
        try:
            await writer.emit("workflow.failed", {
                "error": str(exc)
            })
        finally:
            await websocket.close(code=1011)

This demo keeps events in memory so the core pattern is clear. In production, store events in durable storage. A common setup is Redis Streams for short-term replay and Postgres for longer-term audit history.

Connect From the Browser Client

The client needs to open the socket, send a start command, render events in sequence, and reconnect with the latest sequence number if the connection drops.

const workflowId = "support_triage";
const token = "dev-token";

let runId = localStorage.getItem("current_run_id");
let lastSeq = Number(localStorage.getItem("last_seq") || "0");
let socket = null;
let reconnectAttempts = 0;

function connect() {
  const params = new URLSearchParams({
    token,
    last_seq: String(lastSeq)
  });

  if (runId) {
    params.set("run_id", runId);
  }

  socket = new WebSocket(
    `ws://localhost:8000/ws/workflows/${workflowId}?${params.toString()}`
  );

  socket.onopen = () => {
    reconnectAttempts = 0;
    setConnectionState("connected");

    if (!runId) {
      socket.send(JSON.stringify({
        type: "workflow.start",
        payload: {
          message: "Can this customer get a refund?"
        }
      }));
    }
  };

  socket.onmessage = (message) => {
    const event = JSON.parse(message.data);

    if (event.seq <= lastSeq) {
      return;
    }

    if (event.seq !== lastSeq + 1) {
      console.warn("Gap in event sequence", {
        expected: lastSeq + 1,
        received: event.seq
      });
    }

    runId = event.run_id;
    lastSeq = event.seq;

    localStorage.setItem("current_run_id", runId);
    localStorage.setItem("last_seq", String(lastSeq));

    renderWorkflowEvent(event);
  };

  socket.onclose = (closeEvent) => {
    setConnectionState("disconnected");

    if (closeEvent.code === 1008) {
      showError("You are not authorized to run this workflow.");
      return;
    }

    scheduleReconnect();
  };

  socket.onerror = () => {
    setConnectionState("error");
  };
}

function scheduleReconnect() {
  reconnectAttempts += 1;
  const delayMs = Math.min(1000 * 2 ** reconnectAttempts, 10000);

  setTimeout(() => {
    connect();
  }, delayMs);
}

connect();

This client stores run_id and last_seq in localStorage. If the browser refreshes or the network drops, the next connection asks the server to replay events after last_seq.

Render Workflow Events in the UI

Your UI should render event types differently. Tool calls, plan updates, errors, and final output are separate user experiences.

Screenshot of an agent workflow UI showing plan steps, tool calls, streamed answer tokens, completion status, and reconnect state
Screenshot: client UI updates as ordered workflow events arrive.
function renderWorkflowEvent(event) {
  appendTimelineEvent(event);

  switch (event.type) {
    case "workflow.started":
      setStatus("Workflow started");
      break;

    case "agent.plan":
      renderPlan(event.payload.steps);
      break;

    case "agent.step.started":
      markStepRunning(event.payload.step_name);
      break;

    case "agent.step.completed":
      markStepComplete(event.payload.step_name);
      break;

    case "tool.request":
      addToolCall({
        name: event.payload.tool_name,
        status: "running",
        args: event.payload.args
      });
      break;

    case "tool.result":
      updateToolCall({
        name: event.payload.tool_name,
        status: event.payload.status,
        resultCount: event.payload.result_count,
        durationMs: event.payload.duration_ms
      });
      break;

    case "llm.token":
      appendAnswerToken(event.payload.text);
      break;

    case "workflow.completed":
      setStatus("Complete");
      clearActiveRun();
      break;

    case "workflow.failed":
      setStatus("Failed");
      showError(event.payload.error);
      break;

    default:
      console.debug("Unhandled event", event);
  }
}

function clearActiveRun() {
  localStorage.removeItem("current_run_id");
  localStorage.removeItem("last_seq");
}

Keep a timeline panel in your developer and internal admin views. It helps you debug the order of events without reading server logs.

Screenshot of an event timeline showing sequence numbers for workflow started, agent plan, LLM request, tool request, tool result, streamed tokens, and workflow completed
Screenshot: ordered event timeline with sequence numbers and event types.

Handle Disconnects and Resume Correctly

WebSocket disconnects are normal. Laptops sleep, mobile networks switch, proxies time out, and deploys restart servers. Design for this from the first version.

A reliable resume flow looks like this:

  1. The server assigns a run_id and sends events with increasing seq.
  2. The client stores the latest confirmed seq.
  3. The connection drops.
  4. The client reconnects with run_id and last_seq.
  5. The server replays all events where seq > last_seq.
  6. The client ignores duplicate events where seq <= last_seq.

Do not rely on token streams alone for recovery. If a disconnect happens during token streaming, the client may miss part of the text. Send a final llm.response or workflow.completed event with the full final answer, or let the client fetch the final run state over HTTP after reconnect.

Add Heartbeats

Heartbeats help you detect dead connections and keep some proxies from closing idle sockets. Send a lightweight event every 15 to 30 seconds when the workflow has no other activity.

async def heartbeat_loop(writer: EventWriter, stop_event: asyncio.Event):
    while not stop_event.is_set():
        await asyncio.sleep(20)
        await writer.emit("heartbeat", {
            "status": "alive"
        })

On the client, update a lastHeartbeatAt timestamp. If no events arrive for 60 seconds during an active workflow, show a reconnecting state or reopen the socket.

Handle Backpressure

Agent workflows can generate many events. Token streaming can produce dozens of events per second. Tool output can be large. If you send everything as fast as possible, slow clients can fall behind and memory can grow on your server.

Use these limits as a starting point:

  • Cap individual event payloads at 32 KB unless you have a clear reason to allow more.
  • Batch token events every 50 to 100 milliseconds instead of sending one WebSocket message per token.
  • Limit each active connection to a bounded queue, such as 500 pending events.
  • Send large artifacts, such as full tool output or generated files, through object storage or HTTP download URLs.
  • Close connections that stay behind for too long with a clear error event first when possible.

Here is a simple token buffer that emits text chunks every 75 milliseconds:

class TokenBuffer:
    def __init__(self, writer: EventWriter, flush_ms: int = 75):
        self.writer = writer
        self.flush_ms = flush_ms / 1000
        self.buffer = []
        self.lock = asyncio.Lock()
        self.closed = False

    async def add(self, token: str):
        async with self.lock:
            self.buffer.append(token)

    async def flush_loop(self):
        while not self.closed:
            await asyncio.sleep(self.flush_ms)
            await self.flush()

    async def flush(self):
        async with self.lock:
            if not self.buffer:
                return

            text = "".join(self.buffer)
            self.buffer = []

        await self.writer.emit("llm.token", {
            "text": text
        })

    async def close(self):
        self.closed = True
        await self.flush()

Secure the Connection

WebSockets still need the same security discipline as HTTP endpoints. Do not treat the socket as trusted after it opens.

  • Authenticate before accepting work: Validate a short-lived token or session before starting the workflow.
  • Authorize the workflow: Check that the user can run the requested workflow and access the requested data.
  • Validate every client message: Use a schema for workflow.start, approval.received, and cancellation messages.
  • Use wss:// in production: Never send workflow data over plain WebSocket connections in production.
  • Do not put long-lived secrets in query strings: Query strings can end up in logs. Prefer secure cookies or short-lived signed tokens.
  • Redact sensitive payload fields: Tool results may contain customer data, credentials, or internal documents.

For browser clients, a common production pattern is:

  1. The user authenticates through your normal web app session.
  2. The browser requests a short-lived WebSocket token over HTTPS.
  3. The browser opens wss:// with that token.
  4. The server validates the token and binds the socket to the user and organization.

Support Cancellation and Approvals

Long-running agents need control messages. A user may cancel a run, approve a tool call, or reject a proposed action. Keep client-to-server messages structured too.

{
  "type": "workflow.cancel",
  "payload": {
    "reason": "user_clicked_cancel"
  }
}
{
  "type": "approval.received",
  "payload": {
    "approval_id": "appr_123",
    "approved": true
  }
}

On the server, run workflow work in a task and cancel it when the client sends workflow.cancel.

@app.websocket("/ws/workflows-cancelable/{workflow_id}")
async def cancelable_workflow_socket(websocket: WebSocket, workflow_id: str):
    await websocket.accept()

    run_id = f"run_{uuid.uuid4().hex}"
    writer = EventWriter(websocket, workflow_id, run_id)

    workflow_task = None

    try:
        while True:
            message = json.loads(await websocket.receive_text())

            if message["type"] == "workflow.start":
                workflow_task = asyncio.create_task(
                    run_agent_workflow(writer, message["payload"]["message"])
                )

            elif message["type"] == "workflow.cancel":
                if workflow_task and not workflow_task.done():
                    workflow_task.cancel()

                await writer.emit("workflow.failed", {
                    "error": "Workflow canceled by user",
                    "reason": message["payload"].get("reason")
                })
                await websocket.close(code=1000)
                return

    except WebSocketDisconnect:
        if workflow_task and not workflow_task.done():
            workflow_task.cancel()

Debug Failed Agent Runs

When a workflow fails, you need more than the final error. Capture the prompt version, model, tool arguments, tool response status, latency, retries, evaluator results, and trace IDs.

Screenshot of debugging traces for an agent workflow showing prompt version, model request, tool call, error event, retry, and final failed status
Screenshot: debugging trace for a failed workflow run.

Emit debug events for internal users and store richer trace data on the server. Avoid sending sensitive tool payloads to end users.

async def call_tool_with_trace(writer: EventWriter, tool_name: str, args: dict):
    await writer.emit("tool.request", {
        "tool_name": tool_name,
        "args": args
    })

    started = asyncio.get_event_loop().time()

    try:
        result = await fake_tool_call(tool_name, args)
        duration_ms = int((asyncio.get_event_loop().time() - started) * 1000)

        await writer.emit("tool.result", {
            "tool_name": tool_name,
            "status": "success",
            "duration_ms": duration_ms,
            "result_preview": str(result)[:300]
        })

        return result

    except Exception as exc:
        duration_ms = int((asyncio.get_event_loop().time() - started) * 1000)

        await writer.emit("tool.error", {
            "tool_name": tool_name,
            "status": "error",
            "duration_ms": duration_ms,
            "error_type": exc.__class__.__name__,
            "error_message": str(exc)
        })

        await writer.emit("debug.trace", {
            "message": "Tool call failed",
            "tool_name": tool_name,
            "args_redacted": redact_args(args),
            "duration_ms": duration_ms
        })

        raise

A failed run should answer these questions quickly:

  • Which prompt version ran?
  • Which model and parameters were used?
  • Which tool failed?
  • Was the error caused by validation, timeout, rate limit, or bad model output?
  • Did the agent retry?
  • What did the user see before the failure?

Persist Events for Replay and Auditing

In-memory storage works for local development only. Production systems need durable event storage.

A simple Postgres table can store workflow events:

CREATE TABLE workflow_events (
  id BIGSERIAL PRIMARY KEY,
  workflow_id TEXT NOT NULL,
  run_id TEXT NOT NULL,
  seq INTEGER NOT NULL,
  event_type TEXT NOT NULL,
  trace_id TEXT,
  parent_event_id TEXT,
  payload JSONB NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  UNIQUE (run_id, seq)
);

CREATE INDEX workflow_events_run_seq_idx
ON workflow_events (run_id, seq);

When a client reconnects, query by run_id and seq:

SELECT workflow_id, run_id, seq, event_type, trace_id, parent_event_id, payload, created_at
FROM workflow_events
WHERE run_id = $1
  AND seq > $2
ORDER BY seq ASC;

If your agent runs in a worker process instead of the WebSocket server, publish events through Redis Streams, Kafka, NATS, or your queue of choice. The WebSocket gateway can subscribe to events for the active run and forward them to the connected client.

Common Mistakes

Using WebSockets for Simple Request and Response Tasks

If the model call completes in one or two seconds and returns one response, HTTP is usually better. Add WebSockets when users benefit from live progress, cancellation, approvals, or streaming.

Skipping Authentication

A WebSocket endpoint can expose internal agent steps, tool output, and customer data. Authenticate the connection and authorize the workflow before starting any work.

Sending Unstructured Logs

Strings are easy at first and painful later. Use typed events with sequence numbers. Your UI, replay logic, evals, and debugging tools will be simpler.

Ignoring Backpressure

Fast token streams and large tool outputs can overwhelm slow clients. Batch tokens, cap payload sizes, and use bounded queues.

Treating Token Streams as the Whole Workflow

Tokens are one part of the run. Send separate events for planning, tool calls, evals, approvals, errors, and completion. Otherwise, you will struggle to explain what the agent did.

Production Checklist

  • Use wss:// in production.
  • Authenticate before accepting workflow messages.
  • Authorize access to the workflow and data sources.
  • Send structured events with run_id, seq, type, and payload.
  • Persist events outside the WebSocket process.
  • Replay missed events after reconnect.
  • Ignore duplicate events on the client.
  • Batch token events every 50 to 100 milliseconds.
  • Cap event payload sizes.
  • Send heartbeats every 15 to 30 seconds during idle periods.
  • Support cancellation for long-running workflows.
  • Capture prompt version, model, latency, tool arguments, and errors in traces.
  • Redact sensitive payload fields before sending them to the browser.
  • Keep a timeline view for internal debugging.

Final Pattern

The core pattern is straightforward:

  1. Open one authenticated WebSocket connection for one workflow run.
  2. Start the workflow with a structured client message.
  3. Emit typed server events with increasing sequence numbers.
  4. Render each event based on its type.
  5. Store the latest sequence number on the client.
  6. Replay missed events after reconnect.
  7. Persist traces so failures can be inspected after the socket closes.

This gives users live progress and gives engineering teams a clean record of what the agent did. You can ship a better agent UI without losing the ability to debug production failures.


PromptLayer helps AI teams manage prompts, trace LLM calls, evaluate outputs, and inspect agent workflows in production. If you are building WebSocket-driven agents, connect your prompts, events, and traces in one place. Create a PromptLayer account to start tracking and improving your LLM workflows.

The first platform built for prompt engineering