Streaming with SSE

A2A uses Server-Sent Events (SSE) for real-time streaming. This enables agents to deliver progress updates, partial results, and artifacts as they're produced — instead of making the client wait for the complete response.

How SSE Streaming Works

When a client calls SendStreamingMessage, the server holds the HTTP connection open and sends events as they occur:

HTTP/1.1 200 OK
Content-Type: text/event-stream

data: {"taskId":"t-1","contextId":"ctx-1","status":{"state":"TASK_STATE_WORKING"}}

data: {"taskId":"t-1","contextId":"ctx-1","artifact":{"artifactId":"a-1","parts":[{"text":"partial..."}]},"lastChunk":false}

data: {"taskId":"t-1","contextId":"ctx-1","artifact":{"artifactId":"a-1","parts":[{"text":"complete result"}]},"lastChunk":true}

data: {"taskId":"t-1","contextId":"ctx-1","status":{"state":"TASK_STATE_COMPLETED"}}

Each data: line is a complete JSON object. Events are separated by blank lines.

Stream Event Types

Four types of events can appear in a stream:

StatusUpdate

Reports a task state transition:

{
  "taskId": "task-abc",
  "contextId": "ctx-123",
  "status": {
    "state": "TASK_STATE_WORKING",
    "timestamp": "2026-03-15T10:30:00Z"
  }
}

ArtifactUpdate

Delivers artifact content (potentially in chunks):

{
  "taskId": "task-abc",
  "contextId": "ctx-123",
  "artifact": {
    "artifactId": "result-1",
    "parts": [{"text": "The answer is..."}]
  },
  "lastChunk": false,
  "append": false
}

Task

A complete task snapshot (usually the final event):

{
  "id": "task-abc",
  "contextId": "ctx-123",
  "status": {"state": "TASK_STATE_COMPLETED"},
  "artifacts": [...]
}

Message

A direct message response (for simple request/reply patterns):

{
  "messageId": "msg-456",
  "role": "ROLE_AGENT",
  "parts": [{"text": "Quick answer"}]
}

Server-Side: Writing Events

In your AgentExecutor, write events to the queue:

#![allow(unused)]
fn main() {
impl AgentExecutor for MyExecutor {
    fn execute<'a>(
        &'a self,
        ctx: &'a RequestContext,
        queue: &'a dyn EventQueueWriter,
    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
        Box::pin(async move {
            // Signal start
            queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
                task_id: ctx.task_id.clone(),
                context_id: ContextId::new(ctx.context_id.clone()),
                status: TaskStatus::new(TaskState::Working),
                metadata: None,
            })).await?;

            // Deliver results in chunks
            for (i, chunk) in results.iter().enumerate() {
                queue.write(StreamResponse::ArtifactUpdate(
                    TaskArtifactUpdateEvent {
                        task_id: ctx.task_id.clone(),
                        context_id: ContextId::new(ctx.context_id.clone()),
                        artifact: Artifact::new("output", vec![Part::text(chunk)]),
                        append: Some(i > 0),
                        last_chunk: Some(i == results.len() - 1),
                        metadata: None,
                    }
                )).await?;
            }

            // Signal completion
            queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
                task_id: ctx.task_id.clone(),
                context_id: ContextId::new(ctx.context_id.clone()),
                status: TaskStatus::new(TaskState::Completed),
                metadata: None,
            })).await?;

            Ok(())
        })
    }
}
}

Queue Limits

The event queue uses tokio::sync::broadcast channels for fan-out to multiple subscribers:

LimitDefaultPurpose
Queue capacity64 eventsBroadcast channel ring buffer size
Max event size16 MiBRejects oversized events

With broadcast channels, writes never block — if a reader is too slow, it receives a Lagged notification and skips missed events. The task store is the source of truth; SSE is best-effort notification.

Configure these via the builder:

#![allow(unused)]
fn main() {
RequestHandlerBuilder::new(executor)
    .with_event_queue_capacity(128)
    .with_max_event_size(8 * 1024 * 1024)  // 8 MiB
    .build()
}

Client-Side: Consuming Streams

Use stream_message to receive events:

#![allow(unused)]
fn main() {
let mut stream = client
    .stream_message(params)
    .await
    .expect("connect");

while let Some(event) = stream.next().await {
    match event {
        Ok(StreamResponse::StatusUpdate(ev)) => {
            println!("State: {:?}", ev.status.state);
        }
        Ok(StreamResponse::ArtifactUpdate(ev)) => {
            println!("Artifact: {}", ev.artifact.id);
        }
        Ok(StreamResponse::Task(task)) => {
            println!("Final task: {:?}", task.status.state);
        }
        Ok(StreamResponse::Message(msg)) => {
            println!("Message: {:?}", msg);
        }
        Ok(_) => {
            // Future event types — handle gracefully
        }
        Err(e) => {
            eprintln!("Stream error: {e}");
            break;
        }
    }
}
}

Client Protections

The SSE parser includes safety limits:

  • 16 MiB buffer cap — Prevents OOM from malicious servers
  • 30-second connect timeout — Fails fast on unreachable servers
  • Partial line buffering — Handles TCP frame boundaries correctly

Re-subscribing

If a stream disconnects, re-subscribe to an existing task:

#![allow(unused)]
fn main() {
let mut stream = client
    .subscribe_to_task("task-abc")
    .await
    .expect("resubscribe");
}

The server creates a new broadcast subscriber for the task's event queue. Multiple SSE connections can be active simultaneously for the same task — each receives all events published after it subscribes. If a reader falls behind, it receives a Lagged notification and skips missed events rather than blocking other readers or the writer.

Streaming vs Synchronous

AspectSendMessageSendStreamingMessage
ResponseComplete taskSSE event stream
ProgressNo intermediate updatesReal-time updates
Long tasksClient waitsClient sees progress
NetworkSingle request/responseHeld connection
ComplexitySimpleRequires event handling

Use streaming when:

  • Tasks take more than a few seconds
  • You want to show progress to users
  • You need incremental artifact delivery

Use synchronous when:

  • Tasks complete quickly
  • You don't need progress updates
  • Simplicity is more important than responsiveness

Next Steps