Introduction

a2a-rust is a pure Rust implementation of the A2A (Agent-to-Agent) protocol v1.0.0 — an open standard for connecting AI agents over the network.

If you're building AI agents that need to talk to each other, discover capabilities, delegate tasks, and stream results — this library gives you the full protocol stack with zero unsafe code, compile-time type safety, and production-grade hardening.

What is the A2A Protocol?

The Agent-to-Agent protocol defines how AI agents discover, communicate with, and delegate work to each other. Think of it as HTTP for AI agents: a shared language that lets any agent talk to any other, regardless of implementation.

The protocol defines:

  • Agent Cards — Discovery documents describing what an agent can do
  • Tasks — Units of work with well-defined lifecycle states
  • Messages — Structured payloads carrying text, data, or binary content
  • Streaming — Real-time progress via Server-Sent Events (SSE)
  • Push Notifications — Webhook delivery for async results

Why Rust?

Rust gives you performance, safety, and correctness without compromise:

  • Zero-cost abstractions — Trait objects, generics, and async/await with no runtime overhead
  • No unsafe — The entire codebase is free of unsafe code
  • Thread safety at compile time — All public types implement Send + Sync
  • Exhaustive pattern matching — The compiler catches missing protocol states
  • Production-ready — Battle-tested HTTP via hyper, robust error handling, no panics in library code

Architecture at a Glance

a2a-rust is organized as a Cargo workspace with four crates:

┌─────────────────────────────────────────────┐
│  a2a-protocol-sdk                           │
│  umbrella re-exports + prelude              │
├──────────────────────┬──────────────────────┤
│  a2a-protocol-client │  a2a-protocol-server │
│  HTTP client         │  agent framework     │
├──────────────────────┴──────────────────────┤
│  a2a-protocol-types                         │
│  wire types, serde, no I/O                  │
└─────────────────────────────────────────────┘
CratePurpose
a2a-protocol-typesAll A2A wire types with serde serialization. Pure data — no I/O, no async.
a2a-protocol-clientHTTP client for calling remote A2A agents. Supports JSON-RPC and REST transports.
a2a-protocol-serverServer framework for building A2A agents. Pluggable stores, interceptors, and dispatchers.
a2a-protocol-sdkUmbrella crate that re-exports everything with a convenient prelude module.

Key Features

  • Full v1.0 wire types — Every A2A type with correct JSON serialization
  • Quad transport — JSON-RPC 2.0, REST, WebSocket (websocket feature flag), and gRPC (grpc feature flag), both client and server
  • SSE streaming — Real-time SendStreamingMessage and SubscribeToTask
  • Push notifications — Pluggable PushSender with SSRF protection
  • Agent card discovery — Static and dynamic card handlers with HTTP caching (ETag, Last-Modified, 304)
  • Pluggable storesTaskStore and PushConfigStore traits with in-memory, SQLite, and tenant-aware backends
  • Multi-tenancyTenantAwareInMemoryTaskStore and TenantAwareSqliteTaskStore with full tenant isolation
  • Interceptor chains — Client and server middleware for auth, logging, metrics, rate limiting
  • Rate limiting — Built-in RateLimitInterceptor with per-caller fixed-window limiting
  • Client retry — Configurable RetryPolicy with exponential backoff for transient failures
  • Server startup helperserve() reduces ~25 lines of hyper boilerplate to one call
  • Request ID propagationCallContext::request_id auto-extracted from X-Request-ID header
  • Task store metricsTaskStore::count() for monitoring and capacity management
  • Task state machine — Validated transitions per the A2A specification
  • Executor ergonomicsboxed_future, agent_executor! macro, EventEmitter reduce boilerplate
  • Executor timeout — Kills hung agent tasks automatically
  • CORS support — Configurable cross-origin policies
  • Fully configurable — All defaults (timeouts, limits, intervals) are overridable via builders
  • Mutation-tested — Zero surviving mutants enforced via cargo-mutants CI gate
  • No panics in library code — All fallible operations return Result

All 11 Protocol Methods

MethodDescription
SendMessageSynchronous message send; returns completed task
SendStreamingMessageStreaming send with real-time SSE events
GetTaskRetrieve a task by ID
ListTasksQuery tasks with filtering and pagination
CancelTaskRequest cancellation of a running task
SubscribeToTaskRe-subscribe to an existing task's event stream
CreateTaskPushNotificationConfigRegister a webhook for push delivery
GetTaskPushNotificationConfigRetrieve a push config by ID
ListTaskPushNotificationConfigsList all push configs for a task
DeleteTaskPushNotificationConfigRemove a push config
GetExtendedAgentCardFetch authenticated agent card

What's Next?

Installation

Requirements

  • Rust 1.93+ (stable)
  • A working internet connection for downloading crates

Adding to Your Project

The easiest way to use a2a-rust is through the umbrella SDK crate, which re-exports everything you need:

[dependencies]
a2a-protocol-sdk = "0.2"
tokio = { version = "1", features = ["full"] }

The SDK crate re-exports a2a-protocol-types, a2a-protocol-client, and a2a-protocol-server so you only need one dependency.

Individual Crates

If you prefer fine-grained control, depend on individual crates:

# Types only (no I/O, no async runtime)
a2a-protocol-types = "0.2"

# Client only
a2a-protocol-client = "0.2"

# Server only
a2a-protocol-server = "0.2"

This is useful when:

  • You're building a client only and don't need server types
  • You're building a server only and don't need client types
  • You want to minimize compile times and dependency trees

Feature Flags

All features are off by default to minimize compile times and dependency trees.

a2a-protocol-types

FeatureDescription
signingJWS/ES256 agent card signing (RFC 8785 canonicalization)

a2a-protocol-client

FeatureDescription
tls-rustlsHTTPS via rustls (no OpenSSL required)
signingAgent card signing verification
tracingStructured logging via the tracing crate
websocketWebSocket transport via tokio-tungstenite
grpcgRPC transport via tonic

a2a-protocol-server

FeatureDescription
signingAgent card signing
tracingStructured logging via the tracing crate
sqliteSQLite-backed task and push config stores via sqlx
websocketWebSocket transport via tokio-tungstenite
grpcgRPC transport via tonic
otelOpenTelemetry metrics via opentelemetry-otlp

a2a-protocol-sdk (umbrella)

FeatureDescription
signingEnables signing across types, client, and server
tracingEnables tracing across client and server
tls-rustlsEnables HTTPS in the client
grpcEnables gRPC across client and server
otelEnables OpenTelemetry metrics in the server

Enable features in your Cargo.toml:

[dependencies]
a2a-protocol-sdk = { version = "0.2", features = ["tracing", "signing"] }

# Or with individual crates:
a2a-protocol-server = { version = "0.2", features = ["tracing", "sqlite"] }
a2a-protocol-client = { version = "0.2", features = ["tls-rustls"] }

Verifying the Installation

Create a simple main.rs to verify everything compiles:

use a2a_protocol_sdk::prelude::*;

fn main() {
    // Create a task status
    let status = TaskStatus::new(TaskState::Submitted);
    println!("Task state: {:?}", status.state);

    // Create a message with a text part
    let part = Part::text("Hello, A2A!");
    println!("Part: {:?}", part);

    // Verify agent capabilities builder
    let caps = AgentCapabilities::none()
        .with_streaming(true)
        .with_push_notifications(false);
    println!("Capabilities: {:?}", caps);
}

Run it:

cargo run

If this compiles and runs, you're ready to go.

Next Steps

Quick Start

This guide gets you running the built-in echo agent example in under 5 minutes. The example demonstrates the full A2A stack: agent executor, dual-transport server, and client — all in a single binary.

Running the Echo Agent

Clone the repository and run the example:

git clone https://github.com/tomtom215/a2a-rust.git
cd a2a-rust
cargo run -p echo-agent

You'll see output like:

=== A2A Echo Agent Example ===

JSON-RPC server listening on http://127.0.0.1:54321
REST server listening on http://127.0.0.1:54322

--- Demo 1: Synchronous SendMessage (JSON-RPC) ---
  Task ID:    550e8400-e29b-41d4-a716-446655440000
  Status:     Completed
  Artifact:   echo-artifact
  Content:    Echo: Hello from JSON-RPC client!

--- Demo 2: Streaming SendMessage (JSON-RPC) ---
  Status update: Working
  Artifact update: echo-artifact
  Content:    Echo: Hello from streaming client!
  Status update: Completed

--- Demo 3: Synchronous SendMessage (REST) ---
  Task ID:    ...
  Status:     Completed
  Content:    Echo: Hello from REST client!

--- Demo 4: Streaming SendMessage (REST) ---
  Status update: Working
  Content:    Echo: Hello from REST streaming!
  Status update: Completed

--- Demo 5: GetTask ---
  Fetched task: ... (Completed)

=== All demos completed successfully! ===

What Just Happened?

The example exercised all major protocol operations:

  1. Synchronous send (JSON-RPC) — Client sends a message, waits for the complete task
  2. Streaming send (JSON-RPC) — Client receives real-time SSE events as the agent works
  3. Synchronous send (REST) — Same operation over the REST transport
  4. Streaming send (REST) — SSE streaming over REST
  5. GetTask — Retrieves a previously completed task by ID

The Code in Brief

The echo agent is about 100 lines of Rust. Here's the core executor:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::prelude::*;
use std::future::Future;
use std::pin::Pin;

struct EchoExecutor;

impl AgentExecutor for EchoExecutor {
    fn execute<'a>(
        &'a self,
        ctx: &'a RequestContext,
        queue: &'a dyn EventQueueWriter,
    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
        Box::pin(async move {
            // 1. Transition to Working
            queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
                task_id: ctx.task_id.clone(),
                context_id: ContextId::new(ctx.context_id.clone()),
                status: TaskStatus::new(TaskState::Working),
                metadata: None,
            })).await?;

            // 2. Extract text from incoming message
            let input = ctx.message.parts.iter()
                .find_map(|p| match &p.content {
                    a2a_protocol_types::message::PartContent::Text { text } => Some(text.as_str()),
                    _ => None,
                })
                .unwrap_or("<no text>");

            // 3. Echo back as an artifact
            queue.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent {
                task_id: ctx.task_id.clone(),
                context_id: ContextId::new(ctx.context_id.clone()),
                artifact: Artifact::new("echo-artifact", vec![Part::text(
                    &format!("Echo: {input}")
                )]),
                append: None,
                last_chunk: Some(true),
                metadata: None,
            })).await?;

            // 4. Transition to Completed
            queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
                task_id: ctx.task_id.clone(),
                context_id: ContextId::new(ctx.context_id.clone()),
                status: TaskStatus::new(TaskState::Completed),
                metadata: None,
            })).await?;

            Ok(())
        })
    }
}
}

The pattern is always: write status updates and artifacts to the event queue, then return Ok(()).

With Tracing

Enable structured logging to see the protocol internals:

cargo run -p echo-agent --features tracing
RUST_LOG=debug cargo run -p echo-agent --features tracing

Next Steps

Your First Agent

This guide walks you through building a complete A2A agent from scratch — a calculator that evaluates simple arithmetic expressions.

Project Setup

Create a new binary crate:

cargo new my-agent
cd my-agent

Add dependencies to Cargo.toml:

[dependencies]
a2a-protocol-sdk = "0.2"
tokio = { version = "1", features = ["full"] }
uuid = { version = "1", features = ["v4"] }

Step 1: Define Your Executor

The AgentExecutor trait is the entry point for all agent logic. Implement it to define what your agent does when it receives a message:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::prelude::*;
use a2a_protocol_sdk::server::RequestContext;
use std::future::Future;
use std::pin::Pin;

struct CalcExecutor;

impl AgentExecutor for CalcExecutor {
    fn execute<'a>(
        &'a self,
        ctx: &'a RequestContext,
        queue: &'a dyn EventQueueWriter,
    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
        Box::pin(async move {
            // Signal that we're working
            queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
                task_id: ctx.task_id.clone(),
                context_id: ContextId::new(ctx.context_id.clone()),
                status: TaskStatus::new(TaskState::Working),
                metadata: None,
            })).await?;

            // Extract the expression from the message
            let expr = ctx.message.parts.iter()
                .find_map(|p| match &p.content {
                    a2a_protocol_types::message::PartContent::Text { text } => Some(text.clone()),
                    _ => None,
                })
                .unwrap_or_default();

            // Evaluate (very basic: just handle "a + b")
            let result = evaluate(&expr);

            // Send the result as an artifact
            queue.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent {
                task_id: ctx.task_id.clone(),
                context_id: ContextId::new(ctx.context_id.clone()),
                artifact: Artifact::new(
                    "result",
                    vec![Part::text(&result)],
                ),
                append: None,
                last_chunk: Some(true),
                metadata: None,
            })).await?;

            // Done
            queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
                task_id: ctx.task_id.clone(),
                context_id: ContextId::new(ctx.context_id.clone()),
                status: TaskStatus::new(TaskState::Completed),
                metadata: None,
            })).await?;

            Ok(())
        })
    }
}

fn evaluate(expr: &str) -> String {
    // Toy parser: "3 + 5", "10 - 2", etc.
    let parts: Vec<&str> = expr.split_whitespace().collect();
    if parts.len() != 3 {
        return format!("Error: expected 'a op b', got '{expr}'");
    }
    let a: f64 = match parts[0].parse() {
        Ok(v) => v,
        Err(_) => return format!("Error: invalid number '{}'", parts[0]),
    };
    let b: f64 = match parts[2].parse() {
        Ok(v) => v,
        Err(_) => return format!("Error: invalid number '{}'", parts[2]),
    };
    match parts[1] {
        "+" => format!("{}", a + b),
        "-" => format!("{}", a - b),
        "*" => format!("{}", a * b),
        "/" if b != 0.0 => format!("{}", a / b),
        "/" => "Error: division by zero".into(),
        op => format!("Error: unknown operator '{op}'"),
    }
}
}

Step 2: Create the Agent Card

The agent card tells clients what your agent can do:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::types::agent_card::*;

fn make_agent_card(url: &str) -> AgentCard {
    AgentCard {
        name: "Calculator Agent".into(),
        description: "Evaluates simple arithmetic expressions".into(),
        version: "1.0.0".into(),
        supported_interfaces: vec![AgentInterface {
            url: url.into(),
            protocol_binding: "JSONRPC".into(),
            protocol_version: "1.0.0".into(),
            tenant: None,
        }],
        default_input_modes: vec!["text/plain".into()],
        default_output_modes: vec!["text/plain".into()],
        skills: vec![AgentSkill {
            id: "calc".into(),
            name: "Calculator".into(),
            description: "Evaluates expressions like '3 + 5'".into(),
            tags: vec!["math".into(), "calculator".into()],
            examples: Some(vec![
                "3 + 5".into(),
                "10 * 2".into(),
                "100 / 4".into(),
            ]),
            input_modes: None,
            output_modes: None,
            security_requirements: None,
        }],
        capabilities: AgentCapabilities::none()
            .with_streaming(true)
            .with_push_notifications(false),
        provider: None,
        icon_url: None,
        documentation_url: None,
        security_schemes: None,
        security_requirements: None,
        signatures: None,
    }
}
}

Step 3: Wire Up the Server

Build the request handler and start an HTTP server:

use a2a_protocol_sdk::server::{RequestHandlerBuilder, JsonRpcDispatcher};
use std::sync::Arc;

#[tokio::main]
async fn main() {
    // Build the handler with our executor and agent card
    let handler = Arc::new(
        RequestHandlerBuilder::new(CalcExecutor)
            .with_agent_card(make_agent_card("http://localhost:3000"))
            .build()
            .expect("build handler"),
    );

    // Create the JSON-RPC dispatcher
    let dispatcher = Arc::new(JsonRpcDispatcher::new(handler));

    // Start the HTTP server
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
        .await
        .expect("bind");
    println!("Calculator agent listening on http://127.0.0.1:3000");

    loop {
        let (stream, _) = listener.accept().await.expect("accept");
        let io = hyper_util::rt::TokioIo::new(stream);
        let dispatcher = Arc::clone(&dispatcher);
        tokio::spawn(async move {
            let service = hyper::service::service_fn(move |req| {
                let d = Arc::clone(&dispatcher);
                async move { Ok::<_, std::convert::Infallible>(d.dispatch(req).await) }
            });
            let _ = hyper_util::server::conn::auto::Builder::new(
                hyper_util::rt::TokioExecutor::new(),
            )
            .serve_connection(io, service)
            .await;
        });
    }
}

Step 4: Test with a Client

In a separate terminal (or in the same binary), create a client:

use a2a_protocol_sdk::prelude::*;
use a2a_protocol_sdk::client::ClientBuilder;

#[tokio::main]
async fn main() {
    let client = ClientBuilder::new("http://127.0.0.1:3000".to_string())
        .build()
        .expect("build client");

    let params = MessageSendParams {
        tenant: None,
        message: Message {
            id: MessageId::new(uuid::Uuid::new_v4().to_string()),
            role: MessageRole::User,
            parts: vec![Part::text("42 + 58")],
            task_id: None,
            context_id: None,
            reference_task_ids: None,
            extensions: None,
            metadata: None,
        },
        configuration: None,
        metadata: None,
    };

    match client.send_message(params).await.unwrap() {
        SendMessageResponse::Task(task) => {
            println!("Result: {:?}", task.status.state);
            if let Some(artifacts) = &task.artifacts {
                for art in artifacts {
                    for part in &art.parts {
                        if let a2a_protocol_types::message::PartContent::Text { text } = &part.content {
                            println!("Answer: {text}");
                        }
                    }
                }
            }
        }
        other => println!("Unexpected response: {other:?}"),
    }
}

Output:

Result: Completed
Answer: 100

The Three-Event Pattern

Almost every executor follows this pattern:

  1. Status → Working — Signal that processing has started
  2. ArtifactUpdate — Deliver results (one or more artifacts)
  3. Status → Completed — Signal that processing is done

For streaming clients, these arrive as individual SSE events. For synchronous clients, the handler collects them into a final Task response.

Next Steps

Project Structure

a2a-rust is organized as a Cargo workspace with four crates, each with a clear responsibility. Understanding this structure helps you choose the right dependency for your use case.

Workspace Layout

a2a-rust/
├── Cargo.toml              # Workspace root
├── crates/
│   ├── a2a-protocol-types/          # Wire types (serde, no I/O)
│   │   └── src/
│   │       ├── lib.rs
│   │       ├── task.rs         # Task, TaskState, TaskStatus, TaskId
│   │       ├── message.rs      # Message, Part, PartContent, MessageRole
│   │       ├── artifact.rs     # Artifact, ArtifactId
│   │       ├── agent_card.rs   # AgentCard, AgentInterface, AgentSkill
│   │       ├── events.rs       # StreamResponse, status/artifact events
│   │       ├── params.rs       # MessageSendParams, TaskQueryParams, ...
│   │       ├── responses.rs    # SendMessageResponse, TaskListResponse
│   │       ├── jsonrpc.rs      # JSON-RPC 2.0 envelope types
│   │       ├── push.rs         # Push notification config types
│   │       ├── error.rs        # A2aError, ErrorCode, A2aResult
│   │       ├── security.rs     # Security schemes and requirements
│   │       └── extensions.rs   # Extension and signing types
│   │
│   ├── a2a-protocol-client/         # HTTP client
│   │   └── src/
│   │       ├── lib.rs          # A2aClient, ClientBuilder
│   │       ├── builder/        # ClientBuilder (fluent config)
│   │       │   ├── mod.rs            # Builder struct, configuration setters
│   │       │   └── transport_factory.rs  # build() / build_grpc() assembly
│   │       ├── transport/      # Transport trait + implementations
│   │       │   ├── mod.rs          # Transport trait, truncate_body
│   │       │   ├── rest/           # REST transport
│   │       │   │   ├── mod.rs          # RestTransport struct, constructors
│   │       │   │   ├── request.rs      # URI/request building, execution
│   │       │   │   ├── streaming.rs    # SSE streaming, body reader
│   │       │   │   ├── routing.rs      # Route definitions, method mapping
│   │       │   │   └── query.rs        # Query string building, encoding
│   │       │   └── jsonrpc.rs      # JsonRpcTransport
│   │       ├── streaming/      # SSE parser, EventStream
│   │       │   ├── event_stream.rs # EventStream for consuming SSE
│   │       │   └── sse_parser/     # SSE frame parser
│   │       │       ├── mod.rs          # Re-exports
│   │       │       ├── types.rs        # SseFrame, SseParseError
│   │       │       └── parser.rs       # SseParser state machine
│   │       ├── methods/        # send_message, tasks, push_config
│   │       ├── auth.rs         # CredentialsStore, AuthInterceptor
│   │       └── interceptor.rs  # CallInterceptor, InterceptorChain
│   │
│   ├── a2a-protocol-server/         # Server framework
│   │   └── src/
│   │       ├── lib.rs          # Public re-exports
│   │       ├── handler/        # RequestHandler (core orchestration)
│   │       │   ├── mod.rs          # Struct definition, SendMessageResult
│   │       │   ├── limits.rs       # HandlerLimits config
│   │       │   ├── messaging.rs    # SendMessage / SendStreamingMessage
│   │       │   ├── lifecycle/        # Task lifecycle handlers
│   │       │   │   ├── mod.rs            # Re-exports
│   │       │   │   ├── get_task.rs       # GetTask handler
│   │       │   │   ├── list_tasks.rs     # ListTasks handler
│   │       │   │   ├── cancel_task.rs    # CancelTask handler
│   │       │   │   ├── subscribe.rs      # SubscribeToTask handler
│   │       │   │   └── extended_card.rs  # GetExtendedAgentCard handler
│   │       │   ├── push_config.rs  # Push notification config CRUD
│   │       │   ├── event_processing/  # Event collection & push delivery
│   │       │   │   ├── mod.rs          # Re-exports
│   │       │   │   ├── sync_collector.rs   # Sync-mode event collection
│   │       │   │   └── background/       # Background event processor
│   │       │   │       ├── mod.rs            # Event loop orchestration
│   │       │   │       ├── state_machine.rs  # Event dispatch, state transitions
│   │       │   │       └── push_delivery.rs  # Push notification delivery
│   │       │   ├── shutdown.rs     # Graceful shutdown
│   │       │   └── helpers.rs      # Validation, context builders
│   │       ├── builder.rs      # RequestHandlerBuilder
│   │       ├── executor.rs     # AgentExecutor trait
│   │       ├── executor_helpers.rs # boxed_future, agent_executor!, EventEmitter
│   │       ├── dispatch/       # Protocol dispatchers
│   │       │   ├── mod.rs          # DispatchConfig, re-exports
│   │       │   ├── rest/           # REST dispatcher
│   │       │   │   ├── mod.rs          # RestDispatcher, route handlers
│   │       │   │   ├── response.rs     # HTTP response helpers
│   │       │   │   └── query.rs        # Query/URL parsing utilities
│   │       │   ├── jsonrpc/        # JSON-RPC 2.0 dispatcher
│   │       │   │   ├── mod.rs          # JsonRpcDispatcher, dispatch logic
│   │       │   │   └── response.rs     # JSON-RPC response serialization
│   │       │   └── grpc/           # gRPC dispatcher (feature-gated)
│   │       │       ├── mod.rs          # Proto includes, re-exports
│   │       │       ├── config.rs       # GrpcConfig
│   │       │       ├── dispatcher.rs   # GrpcDispatcher, server setup
│   │       │       ├── service.rs      # A2aService trait implementation
│   │       │       └── helpers.rs      # JSON codec, error mapping
│   │       ├── store/          # Task persistence
│   │       │   ├── mod.rs          # Re-exports
│   │       │   ├── task_store/     # TaskStore trait + in-memory impl
│   │       │   │   ├── mod.rs          # TaskStore trait, TaskStoreConfig
│   │       │   │   └── in_memory/      # InMemoryTaskStore
│   │       │   │       ├── mod.rs          # Core CRUD, TaskStore impl
│   │       │   │       └── eviction.rs     # TTL + capacity eviction
│   │       │   └── tenant/         # Multi-tenant isolation
│   │       │       ├── mod.rs          # Re-exports
│   │       │       ├── context.rs      # TenantContext (task-local)
│   │       │       └── store.rs        # TenantAwareInMemoryTaskStore
│   │       ├── push/           # PushConfigStore, PushSender
│   │       ├── streaming/      # Event streaming
│   │       │   ├── mod.rs          # Re-exports
│   │       │   ├── sse.rs          # SSE response builder
│   │       │   └── event_queue/    # Event queue system
│   │       │       ├── mod.rs          # Traits, constants, constructors
│   │       │       ├── in_memory.rs    # Broadcast-backed queue impl
│   │       │       └── manager.rs      # EventQueueManager
│   │       ├── agent_card/     # Static/Dynamic card handlers
│   │       ├── call_context.rs # CallContext with HTTP headers
│   │       ├── metrics.rs      # Metrics trait
│   │       ├── rate_limit.rs   # RateLimitInterceptor, RateLimitConfig
│   │       ├── serve.rs        # serve(), serve_with_addr() helpers
│   │       └── request_context.rs  # RequestContext
│   │
│   └── a2a-protocol-sdk/            # Umbrella crate
│       └── src/
│           └── lib.rs          # Re-exports + prelude
│
├── examples/
│   └── echo-agent/         # Working example agent
│
├── docs/
│   └── adr/                # Architecture Decision Records
│
└── fuzz/                   # Fuzz testing targets

Crate Dependencies

a2a-protocol-sdk
├── a2a-protocol-client
│   └── a2a-protocol-types
├── a2a-protocol-server
│   └── a2a-protocol-types
└── a2a-protocol-types

The dependency graph is intentionally shallow:

  • a2a-protocol-types has no internal dependencies — just serde and serde_json
  • a2a-protocol-client depends on a2a-protocol-types plus HTTP crates (hyper, hyper-util, http-body-util)
  • a2a-protocol-server depends on a2a-protocol-types plus the same HTTP stack
  • a2a-protocol-sdk depends on all three, adding nothing of its own

Choosing Your Dependency

Use CaseCrate
Just want the types (e.g., for a custom transport)a2a-protocol-types
Building a client that calls remote agentsa2a-protocol-client
Building an agent (server)a2a-protocol-server
Building both client and servera2a-protocol-sdk
Quick prototyping / examplesa2a-protocol-sdk (use the prelude)

The Prelude

The SDK's prelude module exports the most commonly used types so you can get started with a single import:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::prelude::*;
}

This gives you:

  • Core types: Task, TaskState, TaskStatus, Message, Part, Artifact, ArtifactId
  • ID types: TaskId, ContextId, MessageId, MessageRole
  • Events: StreamResponse, TaskStatusUpdateEvent, TaskArtifactUpdateEvent
  • Agent card: AgentCard, AgentInterface, AgentCapabilities, AgentSkill
  • Params: MessageSendParams, TaskQueryParams
  • Responses: SendMessageResponse, TaskListResponse
  • Errors: A2aError, A2aResult, ClientError, ClientResult, ServerError, ServerResult
  • Client: A2aClient, ClientBuilder, EventStream, RetryPolicy
  • Server: AgentExecutor, RequestHandler, RequestHandlerBuilder, RequestContext, EventQueueWriter, EventEmitter, Dispatcher
  • Dispatchers: JsonRpcDispatcher, RestDispatcher
  • Utilities: serve, serve_with_addr, RateLimitInterceptor, RateLimitConfig

External Dependencies

a2a-rust keeps its dependency tree lean:

DependencyUsed For
serde / serde_jsonJSON serialization
hyper / hyper-utilHTTP/1.1 and HTTP/2
http-body-utilHTTP body utilities
tokioAsync runtime
uuidTask and message ID generation
bytesEfficient byte buffers

No web framework (axum, actix, warp) is required — the dispatchers work directly with hyper.

Next Steps

Protocol Overview

The A2A (Agent-to-Agent) protocol defines how AI agents discover each other, exchange messages, manage task lifecycles, and stream results. This page covers the conceptual model — the "what" before the "how."

The Big Picture

An A2A interaction follows this flow:

  Client Agent                Agent (Server)
       │                           │
       │  1. GET /agent.json       │
       │ ─────────────────────────►│
       │          AgentCard        │
       │ ◄─────────────────────────│
       │                           │
       │  2. SendMessage           │
       │ ─────────────────────────►│
       │     Task (or SSE stream)  │
       │ ◄─────────────────────────│
       │                           │
       │  3. GetTask               │
       │ ─────────────────────────►│
       │          Task             │
       │ ◄─────────────────────────│
       │                           │
  1. Discovery — The client fetches the agent's card from /.well-known/agent.json
  2. Communication — The client sends a message and receives results
  3. Management — The client can query, cancel, or subscribe to tasks

Core Entities

Tasks

A Task is the central unit of work. When a client sends a message, the server creates a task that progresses through well-defined states:

                     ┌───────────┐
              ┌──────│ Submitted │──────┐
              │      └─────┬─────┘      │
              │            │            │
              ▼            ▼            ▼
         ┌────────┐   ┌────────┐   ┌──────────┐
    ┌───►│Working │   │Failed *│   │Rejected *│
    │    └───┬────┘   └────────┘   └──────────┘
    │        │
    │   ┌────┼──────────┬───────────┐
    │   │    │          │           │
    │   ▼    ▼          ▼           ▼
    │ ┌───────────┐ ┌─────────┐ ┌──────────┐
    │ │Completed *│ │ Input   │ │  Auth    │
    │ └───────────┘ │Required │ │ Required │
    │               └────┬────┘ └────┬─────┘
    │                    │           │
    └────────────────────┴───────────┘
      (InputRequired and AuthRequired
       also transition to Failed/Canceled)

    * = terminal state (no further transitions)

Terminal states (Completed, Failed, Canceled, Rejected) are final — no further transitions are allowed.

Messages

A Message is a structured payload sent between agents. Each message has:

  • A unique ID (MessageId)
  • A roleUser (from the client) or Agent (from the server)
  • One or more Parts — the actual content

Parts

A Part is a content unit within a message. Three content types are supported:

TypeDescriptionExample
TextPlain text"Summarize this document"
FileInline bytes or URI referenceImage data, "https://example.com/doc.pdf"
DataStructured JSON{"table": [...], "columns": [...]}

Artifacts

An Artifact is a result produced by an agent. Like messages, artifacts contain parts. Unlike messages, artifacts belong to a task and can be delivered incrementally via streaming.

Agent Cards

An Agent Card is the discovery document that describes an agent — its name, capabilities, skills, and how to connect. Think of it as a machine-readable business card.

Request/Response Model

A2A supports two communication styles:

Synchronous (SendMessage)

The client sends a message and blocks until the task is complete:

  Client                        Server
     │                             │
     │  SendMessage                │
     │ ───────────────────────────►│
     │                    Executor runs,
     │                    collects events
     │            Task             │
     │ ◄───────────────────────────│
     │                             │

Streaming (SendStreamingMessage)

The client sends a message and receives events in real time via SSE:

  Client                        Server
     │                             │
     │  SendStreamingMessage       │
     │ ───────────────────────────►│
     │    StatusUpdate: Working    │
     │ ◄───────────────────────────│
     │    ArtifactUpdate           │
     │ ◄───────────────────────────│
     │    ArtifactUpdate           │
     │ ◄───────────────────────────│
     │    StatusUpdate: Completed  │
     │ ◄───────────────────────────│
     │                             │

Streaming is ideal for long-running tasks where the client wants progress updates.

Contexts and Conversations

Tasks exist within a Context — a conversation thread. Multiple tasks can share the same context, allowing agents to maintain conversational state across interactions.

When a client sends a message with a context_id, the server groups that task with previous tasks in the same context. If no context_id is provided, the server creates a new one.

Multi-Tenancy

A2A supports multi-tenancy via an optional tenant field on all requests. This allows a single agent server to serve multiple isolated tenants, each with their own tasks and configurations.

In the REST transport, tenancy is expressed as a path prefix: /tenants/{tenant-id}/tasks/...

Next Steps

Transport Layers

A2A supports four transport bindings: JSON-RPC 2.0, REST, WebSocket (websocket feature flag), and gRPC (grpc feature flag). All four are first-class citizens in a2a-rust — the server can serve multiple transports simultaneously, and the client auto-selects based on the agent card.

JSON-RPC 2.0

The JSON-RPC transport sends all requests to a single endpoint as POST requests with a JSON-RPC 2.0 envelope:

{
  "jsonrpc": "2.0",
  "method": "SendMessage",
  "id": "req-1",
  "params": {
    "message": {
      "messageId": "msg-1",
      "role": "ROLE_USER",
      "parts": [{"text": "Hello, agent!"}]
    }
  }
}

Response:

{
  "jsonrpc": "2.0",
  "id": "req-1",
  "result": {
    "id": "task-abc",
    "contextId": "ctx-123",
    "status": { "state": "TASK_STATE_COMPLETED" },
    "artifacts": [...]
  }
}

Method Names

A2A OperationJSON-RPC Method
Send messageSendMessage
Stream messageSendStreamingMessage
Get taskGetTask
List tasksListTasks
Cancel taskCancelTask
Subscribe to taskSubscribeToTask
Create push configCreateTaskPushNotificationConfig
Get push configGetTaskPushNotificationConfig
List push configsListTaskPushNotificationConfigs
Delete push configDeleteTaskPushNotificationConfig
Extended cardGetExtendedAgentCard

Batching

JSON-RPC supports batch requests — multiple operations in a single HTTP request:

[
  {"jsonrpc": "2.0", "method": "GetTask", "id": "1", "params": {"id": "task-a"}},
  {"jsonrpc": "2.0", "method": "GetTask", "id": "2", "params": {"id": "task-b"}}
]

Note: Streaming methods (SendStreamingMessage, SubscribeToTask) cannot be used in batch requests and will return an error.

ID Handling

JSON-RPC request IDs can be strings, numbers (including 0 and floats), or null. The server preserves the exact ID type in the response.

REST

The REST transport uses standard HTTP methods and URL paths:

OperationMethodPath
Send messagePOST/message:send
Stream messagePOST/message:stream
Get taskGET/tasks/{id}
List tasksGET/tasks
Cancel taskPOST/tasks/{id}:cancel
SubscribeGET/tasks/{id}:subscribe
Create push configPOST/tasks/{id}/pushNotificationConfigs
Get push configGET/tasks/{id}/pushNotificationConfigs/{configId}
List push configsGET/tasks/{id}/pushNotificationConfigs
Delete push configDELETE/tasks/{id}/pushNotificationConfigs/{configId}
Agent cardGET/.well-known/agent.json

Multi-Tenant Paths

With tenancy, paths are prefixed: /tenants/{tenant-id}/tasks/{id}

Content Types

The REST dispatcher accepts both application/json and application/a2a+json.

Security

The REST dispatcher includes built-in protections:

  • Path traversal rejection.. in path segments (including percent-encoded %2E%2E) returns 400
  • Query string limits — Query strings over 4 KiB return 414
  • Body size limits — Request bodies over 4 MiB return 413

WebSocket

The WebSocket transport (websocket feature flag) provides a persistent bidirectional channel over a single TCP connection. JSON-RPC 2.0 messages are exchanged as WebSocket text frames.

# Server
a2a-protocol-server = { version = "0.2", features = ["websocket"] }

# Client
a2a-protocol-client = { version = "0.2", features = ["websocket"] }

Server

#![allow(unused)]
fn main() {
use a2a_protocol_server::{WebSocketDispatcher, RequestHandlerBuilder};
use std::sync::Arc;

let handler = Arc::new(RequestHandlerBuilder::new(my_executor).build().unwrap());
let dispatcher = Arc::new(WebSocketDispatcher::new(handler));

// Start accepting WebSocket connections
dispatcher.serve("0.0.0.0:3002").await?;
}

Protocol

  • Client sends JSON-RPC 2.0 requests as text frames
  • Server responds with JSON-RPC 2.0 responses as text frames
  • For streaming methods (SendStreamingMessage, SubscribeToTask), the server sends multiple frames — one per event — followed by a stream_complete response
  • Ping/pong frames are handled automatically
  • Connection closes cleanly on WebSocket close frame

Client

#![allow(unused)]
fn main() {
use a2a_protocol_client::WebSocketTransport;

let transport = WebSocketTransport::connect("ws://agent.example.com:3002").await?;
let client = ClientBuilder::new("ws://agent.example.com:3002")
    .with_custom_transport(transport)
    .build()?;
}

When to Use WebSocket

  • Long-lived connections — Avoids TCP/TLS handshake overhead per request
  • Bidirectional streaming — Server can push events without SSE
  • Low latency — No HTTP framing overhead for small messages

gRPC

The gRPC transport (grpc feature flag) provides high-performance RPC via protocol buffers and HTTP/2. JSON payloads are carried inside protobuf bytes fields, reusing all existing serde types — no duplicate type definitions.

# Server
a2a-protocol-server = { version = "0.2", features = ["grpc"] }

# Client
a2a-protocol-client = { version = "0.2", features = ["grpc"] }

Server

#![allow(unused)]
fn main() {
use a2a_protocol_server::{GrpcDispatcher, GrpcConfig};
use std::sync::Arc;

let handler = Arc::new(RequestHandlerBuilder::new(my_executor).build().unwrap());
let config = GrpcConfig::default()
    .with_max_message_size(8 * 1024 * 1024);
let dispatcher = GrpcDispatcher::new(handler, config);
dispatcher.serve("0.0.0.0:50051").await?;
}

Tip: Use serve_with_listener() when you need to know the server address before constructing the handler (e.g., for agent cards with correct URLs). Pre-bind a TcpListener, extract the address, build your handler, then pass the listener.

Client

#![allow(unused)]
fn main() {
use a2a_protocol_client::GrpcTransport;

let transport = GrpcTransport::connect("http://agent.example.com:50051").await?;
let client = ClientBuilder::new("http://agent.example.com:50051")
    .with_custom_transport(transport)
    .build()?;
}

Protocol

  • All 11 A2A methods are mapped to gRPC RPCs
  • Streaming methods (SendStreamingMessage, SubscribeToTask) use gRPC server streaming
  • JSON payloads are wrapped in JsonPayload { bytes data = 1 } protobuf messages
  • The proto definition is at proto/a2a.proto

When to Use gRPC

  • Service mesh integration — gRPC is native to Kubernetes, Istio, Envoy
  • Language interop — gRPC has code generation for 10+ languages
  • HTTP/2 multiplexing — Multiple RPCs over a single connection
  • Streaming — Native server streaming without SSE

Choosing a Transport

FactorJSON-RPCRESTWebSocketgRPC
Batch operationsSupportedNot supportedNot supportedNot supported
CachingLimited (POST-only)HTTP cache-friendly (GET)Not applicableNot applicable
ToolingNeeds JSON-RPC clientStandard HTTP tools workWebSocket client neededgRPC client needed
URL structureSingle endpointResource-orientedSingle connectionSingle connection
StreamingSSE via POSTSSE via POST/GETNative text framesNative server streaming
Connection reuseHTTP keep-aliveHTTP keep-alivePersistent connectionHTTP/2 multiplexing

JSON-RPC and REST use SSE for streaming. WebSocket uses native text frames. gRPC uses native server streaming over HTTP/2. The choice is mostly about ecosystem fit — JSON-RPC for agent-to-agent communication, REST for standard HTTP tooling, WebSocket for persistent low-latency connections, gRPC for service mesh and cross-language interop.

Running Both Transports

The server can serve both transports simultaneously on different ports:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::{JsonRpcDispatcher, RestDispatcher, RequestHandlerBuilder};
use std::sync::Arc;

let handler = Arc::new(
    RequestHandlerBuilder::new(my_executor).build().unwrap()
);

// JSON-RPC on port 3000
let jsonrpc = Arc::new(JsonRpcDispatcher::new(Arc::clone(&handler)));

// REST on port 3001
let rest = Arc::new(RestDispatcher::new(handler));
}

All dispatchers share the same RequestHandler, which means they share the same task store, push config store, and executor.

Next Steps

Agent Cards & Discovery

An Agent Card is the machine-readable discovery document that describes an A2A agent. Clients fetch the card to learn what the agent can do, how to connect, and what security is required.

The Agent Card

Agent cards are served at /.well-known/agent.json and contain:

{
  "name": "Calculator Agent",
  "description": "Evaluates arithmetic expressions",
  "version": "1.0.0",
  "supportedInterfaces": [
    {
      "url": "https://agent.example.com/rpc",
      "protocolBinding": "JSONRPC",
      "protocolVersion": "1.0.0"
    },
    {
      "url": "https://agent.example.com/api",
      "protocolBinding": "REST",
      "protocolVersion": "1.0.0"
    }
  ],
  "defaultInputModes": ["text/plain"],
  "defaultOutputModes": ["text/plain"],
  "skills": [
    {
      "id": "calc",
      "name": "Calculator",
      "description": "Evaluates expressions like '3 + 5'",
      "tags": ["math", "calculator"],
      "examples": ["3 + 5", "10 * 2"]
    }
  ],
  "capabilities": {
    "streaming": true,
    "pushNotifications": false
  }
}

Required Fields

FieldDescription
nameHuman-readable agent name
descriptionWhat the agent does
versionSemantic version of this agent
supportedInterfacesAt least one interface with URL and protocol binding
defaultInputModesMIME types accepted (e.g., ["text/plain"])
defaultOutputModesMIME types produced
skillsAt least one skill describing a capability
capabilitiesCapability flags (streaming, push, etc.)

Skills

Skills describe discrete capabilities:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::types::agent_card::AgentSkill;

let skill = AgentSkill {
    id: "summarize".into(),
    name: "Summarizer".into(),
    description: "Summarizes long documents".into(),
    tags: vec!["nlp".into(), "summarization".into()],
    examples: Some(vec![
        "Summarize this research paper".into(),
        "Give me a 3-sentence summary".into(),
    ]),
    input_modes: Some(vec!["text/plain".into(), "application/pdf".into()]),
    output_modes: Some(vec!["text/plain".into()]),
    security_requirements: None,
};
}

Skills can override the agent's default input/output modes and declare their own security requirements.

Capabilities

The AgentCapabilities struct advertises what the agent supports:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::prelude::AgentCapabilities;

let caps = AgentCapabilities::none()
    .with_streaming(true)           // Supports SendStreamingMessage
    .with_push_notifications(true)  // Supports push notification configs
    .with_extended_agent_card(true); // Has an authenticated extended card
}

Note: AgentCapabilities is #[non_exhaustive] — always construct it via AgentCapabilities::none() and the builder methods, never with a struct literal.

Interfaces

Each interface describes a transport endpoint:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::types::agent_card::AgentInterface;

let interface = AgentInterface {
    url: "https://agent.example.com/rpc".into(),
    protocol_binding: "JSONRPC".into(),  // or "REST", "GRPC"
    protocol_version: "1.0.0".into(),
    tenant: None, // Optional: fixed tenant for this interface
};
}

An agent must have at least one interface. Having multiple interfaces (e.g., JSON-RPC and REST) lets clients choose their preferred transport.

Serving Agent Cards

Static Handler

For agent cards that don't change at runtime:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::RequestHandlerBuilder;

let handler = RequestHandlerBuilder::new(my_executor)
    .with_agent_card(make_agent_card())
    .build()
    .unwrap();
}

The static handler automatically provides:

  • ETag headers for cache validation
  • Last-Modified timestamps
  • Cache-Control directives
  • 304 Not Modified responses for conditional requests

Dynamic Handler

For agent cards that change (e.g., based on feature flags, load, or authentication):

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::{AgentCardProducer, DynamicAgentCardHandler};
use a2a_protocol_sdk::types::agent_card::AgentCard;

struct MyCardProducer;

impl AgentCardProducer for MyCardProducer {
    fn produce<'a>(&'a self) -> Pin<Box<dyn Future<Output = A2aResult<AgentCard>> + Send + 'a>> {
        Box::pin(async move {
            // Generate card dynamically
            Ok(make_agent_card())
        })
    }
}
}

The dynamic handler calls the producer on every request, computes a fresh ETag, and handles conditional caching.

Hot-Reload Handler

For agent cards loaded from a JSON file that may change at runtime:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::HotReloadAgentCardHandler;
use std::path::Path;
use std::time::Duration;

let handler = HotReloadAgentCardHandler::new(initial_card);

// Cross-platform: poll the file every 30 seconds
let watcher = handler.spawn_poll_watcher(
    Path::new("/etc/a2a/agent.json"),
    Duration::from_secs(30),
);

// Unix only: reload on SIGHUP
#[cfg(unix)]
let signal_watcher = handler.spawn_signal_watcher(
    Path::new("/etc/a2a/agent.json"),
);
}

HotReloadAgentCardHandler implements AgentCardProducer, so it plugs directly into DynamicAgentCardHandler for full HTTP caching support. The internal Arc<RwLock<AgentCard>> ensures updates are atomic and lock-free for readers.

HTTP Caching

Agent card responses include standard HTTP caching headers (RFC 7232):

HeaderPurpose
ETagContent hash for cache validation
Last-ModifiedTimestamp of last change
Cache-Controlpublic, max-age=60 (configurable)

Clients should send If-None-Match or If-Modified-Since headers. If the card hasn't changed, the server returns 304 Not Modified with no body.

Security

Agent cards can declare security requirements using OpenAPI-style schemes:

{
  "securitySchemes": {
    "bearer": {
      "type": "http",
      "scheme": "bearer"
    }
  },
  "securityRequirements": [
    { "bearer": [] }
  ]
}

Individual skills can also declare their own security requirements, overriding the global ones.

Extended Agent Cards

If the agent supports extendedAgentCard capability, clients can fetch an authenticated version with additional details via the GetExtendedAgentCard method.

Next Steps

Tasks & Messages

Tasks and messages are the core data model of the A2A protocol. Understanding their structure and lifecycle is essential for building agents.

Tasks

A Task represents a unit of work. Every SendMessage call creates one.

Task Structure

#![allow(unused)]
fn main() {
pub struct Task {
    pub id: TaskId,                          // Server-assigned unique ID
    pub context_id: ContextId,               // Conversation thread ID
    pub status: TaskStatus,                  // Current state + optional message
    pub history: Option<Vec<Message>>,       // Previous messages in context
    pub artifacts: Option<Vec<Artifact>>,    // Produced results
    pub metadata: Option<serde_json::Value>, // Arbitrary key-value data
}
}

Task States

Tasks follow a state machine with validated transitions:

StateMeaningTerminal?
SubmittedReceived, not yet startedNo
WorkingActively processingNo
InputRequiredNeeds more input from clientNo
AuthRequiredNeeds authenticationNo
CompletedFinished successfullyYes
FailedFinished with errorYes
CanceledCanceled by clientYes
RejectedRejected before executionYes

Valid Transitions

Not all state transitions are allowed. The library enforces these rules:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::prelude::TaskState;

// Check if a transition is valid
assert!(TaskState::Submitted.can_transition_to(TaskState::Working));
assert!(TaskState::Working.can_transition_to(TaskState::Completed));

// Terminal states cannot transition
assert!(!TaskState::Completed.can_transition_to(TaskState::Working));
assert!(!TaskState::Failed.can_transition_to(TaskState::Working));

// Check if a state is terminal
assert!(TaskState::Completed.is_terminal());
assert!(!TaskState::Working.is_terminal());
}

Task Status

The status combines a state with an optional message and timestamp:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::prelude::{TaskStatus, TaskState};

// Without timestamp
let status = TaskStatus::new(TaskState::Working);

// With automatic UTC timestamp
let status = TaskStatus::with_timestamp(TaskState::Completed);
}

Wire Format

On the wire, task states use the TASK_STATE_ prefix:

{
  "id": "task-abc",
  "contextId": "ctx-123",
  "status": {
    "state": "TASK_STATE_COMPLETED",
    "timestamp": "2026-03-15T10:30:00Z"
  },
  "artifacts": [...]
}

Messages

A Message is a structured payload exchanged between client and agent:

#![allow(unused)]
fn main() {
pub struct Message {
    pub id: MessageId,                           // Unique message ID
    pub role: MessageRole,                       // User or Agent
    pub parts: Vec<Part>,                        // Content (≥1 part)
    pub task_id: Option<TaskId>,                 // Associated task
    pub context_id: Option<ContextId>,           // Conversation thread
    pub reference_task_ids: Option<Vec<TaskId>>, // Related tasks
    pub extensions: Option<Vec<String>>,         // Extension URIs
    pub metadata: Option<serde_json::Value>,
}
}

Roles

RoleWire ValueMeaning
UserROLE_USERFrom the client/human side
AgentROLE_AGENTFrom the agent/server side

Creating Messages

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::prelude::*;

let message = Message {
    id: MessageId::new(uuid::Uuid::new_v4().to_string()),
    role: MessageRole::User,
    parts: vec![Part::text("What is 2 + 2?")],
    task_id: None,
    context_id: None,
    reference_task_ids: None,
    extensions: None,
    metadata: None,
};
}

Parts

Parts are the content units within messages and artifacts. Three types are supported:

Text

#![allow(unused)]
fn main() {
let part = Part::text("Hello, agent!");
}

Wire format: {"type": "text", "text": "Hello, agent!"}

File (bytes or URI)

#![allow(unused)]
fn main() {
// Inline bytes (base64-encoded)
let part = Part::file_bytes(base64_encoded_string);

// URI reference
let part = Part::file_uri("https://example.com/document.pdf");
}

Wire format (bytes): {"type": "file", "file": {"bytes": "aGVsbG8="}}

Wire format (URI): {"type": "file", "file": {"uri": "https://example.com/document.pdf"}}

Structured Data

#![allow(unused)]
fn main() {
let part = Part::data(serde_json::json!({
    "table": [
        {"name": "Alice", "score": 95},
        {"name": "Bob", "score": 87}
    ]
}));
}

Wire format: {"type": "data", "data": {"table": [...]}}

Part Metadata

Any part can carry optional metadata:

{
  "type": "text",
  "text": "Hello",
  "metadata": {"language": "en"}
}

Artifacts

Artifacts are results produced by an agent, delivered as part of a task:

#![allow(unused)]
fn main() {
pub struct Artifact {
    pub id: ArtifactId,
    pub name: Option<String>,
    pub description: Option<String>,
    pub parts: Vec<Part>,                    // ≥1 part
    pub extensions: Option<Vec<String>>,
    pub metadata: Option<serde_json::Value>,
}
}

Create an artifact:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::prelude::*;

let artifact = Artifact::new(
    "result-1",
    vec![Part::text("The answer is 42")],
);
}

Streaming Artifacts

Artifacts can be delivered incrementally during streaming:

#![allow(unused)]
fn main() {
// First chunk
queue.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent {
    task_id: ctx.task_id.clone(),
    context_id: ContextId::new(ctx.context_id.clone()),
    artifact: Artifact::new("doc", vec![Part::text("First paragraph...")]),
    append: None,
    last_chunk: Some(false),  // More chunks coming
    metadata: None,
})).await?;

// Final chunk
queue.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent {
    task_id: ctx.task_id.clone(),
    context_id: ContextId::new(ctx.context_id.clone()),
    artifact: Artifact::new("doc", vec![Part::text("Last paragraph.")]),
    append: Some(true),       // Append to previous
    last_chunk: Some(true),   // This is the last chunk
    metadata: None,
})).await?;
}

ID Types

a2a-rust uses newtype wrappers for type safety:

TypeWrapsExample
TaskIdStringTaskId::new("task-abc")
ContextIdStringContextId::new("ctx-123")
MessageIdStringMessageId::new("msg-456")
ArtifactIdStringConstructed inside Artifact::new

These prevent accidentally passing a task ID where a context ID is expected.

Next Steps

Streaming with SSE

A2A uses Server-Sent Events (SSE) for real-time streaming. This enables agents to deliver progress updates, partial results, and artifacts as they're produced — instead of making the client wait for the complete response.

How SSE Streaming Works

When a client calls SendStreamingMessage, the server holds the HTTP connection open and sends events as they occur:

HTTP/1.1 200 OK
Content-Type: text/event-stream

data: {"taskId":"t-1","contextId":"ctx-1","status":{"state":"TASK_STATE_WORKING"}}

data: {"taskId":"t-1","contextId":"ctx-1","artifact":{"artifactId":"a-1","parts":[{"text":"partial..."}]},"lastChunk":false}

data: {"taskId":"t-1","contextId":"ctx-1","artifact":{"artifactId":"a-1","parts":[{"text":"complete result"}]},"lastChunk":true}

data: {"taskId":"t-1","contextId":"ctx-1","status":{"state":"TASK_STATE_COMPLETED"}}

Each data: line is a complete JSON object. Events are separated by blank lines.

Stream Event Types

Four types of events can appear in a stream:

StatusUpdate

Reports a task state transition:

{
  "taskId": "task-abc",
  "contextId": "ctx-123",
  "status": {
    "state": "TASK_STATE_WORKING",
    "timestamp": "2026-03-15T10:30:00Z"
  }
}

ArtifactUpdate

Delivers artifact content (potentially in chunks):

{
  "taskId": "task-abc",
  "contextId": "ctx-123",
  "artifact": {
    "artifactId": "result-1",
    "parts": [{"text": "The answer is..."}]
  },
  "lastChunk": false,
  "append": false
}

Task

A complete task snapshot (usually the final event):

{
  "id": "task-abc",
  "contextId": "ctx-123",
  "status": {"state": "TASK_STATE_COMPLETED"},
  "artifacts": [...]
}

Message

A direct message response (for simple request/reply patterns):

{
  "messageId": "msg-456",
  "role": "ROLE_AGENT",
  "parts": [{"text": "Quick answer"}]
}

Server-Side: Writing Events

In your AgentExecutor, write events to the queue:

#![allow(unused)]
fn main() {
impl AgentExecutor for MyExecutor {
    fn execute<'a>(
        &'a self,
        ctx: &'a RequestContext,
        queue: &'a dyn EventQueueWriter,
    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
        Box::pin(async move {
            // Signal start
            queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
                task_id: ctx.task_id.clone(),
                context_id: ContextId::new(ctx.context_id.clone()),
                status: TaskStatus::new(TaskState::Working),
                metadata: None,
            })).await?;

            // Deliver results in chunks
            for (i, chunk) in results.iter().enumerate() {
                queue.write(StreamResponse::ArtifactUpdate(
                    TaskArtifactUpdateEvent {
                        task_id: ctx.task_id.clone(),
                        context_id: ContextId::new(ctx.context_id.clone()),
                        artifact: Artifact::new("output", vec![Part::text(chunk)]),
                        append: Some(i > 0),
                        last_chunk: Some(i == results.len() - 1),
                        metadata: None,
                    }
                )).await?;
            }

            // Signal completion
            queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
                task_id: ctx.task_id.clone(),
                context_id: ContextId::new(ctx.context_id.clone()),
                status: TaskStatus::new(TaskState::Completed),
                metadata: None,
            })).await?;

            Ok(())
        })
    }
}
}

Queue Limits

The event queue uses tokio::sync::broadcast channels for fan-out to multiple subscribers:

LimitDefaultPurpose
Queue capacity64 eventsBroadcast channel ring buffer size
Max event size16 MiBRejects oversized events

With broadcast channels, writes never block — if a reader is too slow, it receives a Lagged notification and skips missed events. The task store is the source of truth; SSE is best-effort notification.

Configure these via the builder:

#![allow(unused)]
fn main() {
RequestHandlerBuilder::new(executor)
    .with_event_queue_capacity(128)
    .with_max_event_size(8 * 1024 * 1024)  // 8 MiB
    .build()
}

Client-Side: Consuming Streams

Use stream_message to receive events:

#![allow(unused)]
fn main() {
let mut stream = client
    .stream_message(params)
    .await
    .expect("connect");

while let Some(event) = stream.next().await {
    match event {
        Ok(StreamResponse::StatusUpdate(ev)) => {
            println!("State: {:?}", ev.status.state);
        }
        Ok(StreamResponse::ArtifactUpdate(ev)) => {
            println!("Artifact: {}", ev.artifact.id);
        }
        Ok(StreamResponse::Task(task)) => {
            println!("Final task: {:?}", task.status.state);
        }
        Ok(StreamResponse::Message(msg)) => {
            println!("Message: {:?}", msg);
        }
        Ok(_) => {
            // Future event types — handle gracefully
        }
        Err(e) => {
            eprintln!("Stream error: {e}");
            break;
        }
    }
}
}

Client Protections

The SSE parser includes safety limits:

  • 16 MiB buffer cap — Prevents OOM from malicious servers
  • 30-second connect timeout — Fails fast on unreachable servers
  • Partial line buffering — Handles TCP frame boundaries correctly

Re-subscribing

If a stream disconnects, re-subscribe to an existing task:

#![allow(unused)]
fn main() {
let mut stream = client
    .subscribe_to_task("task-abc")
    .await
    .expect("resubscribe");
}

The server creates a new broadcast subscriber for the task's event queue. Multiple SSE connections can be active simultaneously for the same task — each receives all events published after it subscribes. If a reader falls behind, it receives a Lagged notification and skips missed events rather than blocking other readers or the writer.

Streaming vs Synchronous

AspectSendMessageSendStreamingMessage
ResponseComplete taskSSE event stream
ProgressNo intermediate updatesReal-time updates
Long tasksClient waitsClient sees progress
NetworkSingle request/responseHeld connection
ComplexitySimpleRequires event handling

Use streaming when:

  • Tasks take more than a few seconds
  • You want to show progress to users
  • You need incremental artifact delivery

Use synchronous when:

  • Tasks complete quickly
  • You don't need progress updates
  • Simplicity is more important than responsiveness

Next Steps

The AgentExecutor Trait

The AgentExecutor trait is the heart of every A2A agent. It defines what happens when a message arrives. Everything else — HTTP handling, task management, streaming — is handled by the framework.

The Trait

#![allow(unused)]
fn main() {
pub trait AgentExecutor: Send + Sync + 'static {
    /// Called when a message arrives (SendMessage or SendStreamingMessage).
    fn execute<'a>(
        &'a self,
        ctx: &'a RequestContext,
        queue: &'a dyn EventQueueWriter,
    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;

    /// Called when a client requests task cancellation.
    /// Default: returns "task not cancelable" error.
    fn cancel<'a>(
        &'a self,
        ctx: &'a RequestContext,
        queue: &'a dyn EventQueueWriter,
    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;

    /// Called during graceful server shutdown.
    /// Default: no-op.
    fn on_shutdown<'a>(
        &'a self,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
}
}

Why Pin<Box<dyn Future>>?

This signature ensures object safety — the trait can be stored as Arc<dyn AgentExecutor> and shared across threads. Standard async fn in traits would prevent this. The Box::pin(async move { ... }) wrapper is the idiomatic pattern:

#![allow(unused)]
fn main() {
impl AgentExecutor for MyAgent {
    fn execute<'a>(
        &'a self,
        ctx: &'a RequestContext,
        queue: &'a dyn EventQueueWriter,
    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
        Box::pin(async move {
            // Your async logic here
            Ok(())
        })
    }
}
}

Ergonomic Helpers

The executor_helpers module provides shortcuts to reduce boilerplate.

boxed_future helper

Wraps an async block into the required Pin<Box<dyn Future>>:

#![allow(unused)]
fn main() {
use a2a_protocol_server::executor_helpers::boxed_future;

fn execute<'a>(&'a self, ctx: &'a RequestContext, queue: &'a dyn EventQueueWriter)
    -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>
{
    boxed_future(async move {
        // Your logic here — no Box::pin wrapper needed!
        Ok(())
    })
}
}

agent_executor! macro

Generates the full AgentExecutor impl from a closure-like syntax:

#![allow(unused)]
fn main() {
use a2a_protocol_server::agent_executor;

struct EchoAgent;

// Simple form (execute only)
agent_executor!(EchoAgent, |ctx, queue| async {
    Ok(())
});

// With cancel handler
agent_executor!(CancelableAgent,
    execute: |ctx, queue| async { Ok(()) },
    cancel: |ctx, queue| async { Ok(()) }
);
}

EventEmitter helper

Eliminates the repetitive task_id.clone() / context_id.clone() in every event:

#![allow(unused)]
fn main() {
use a2a_protocol_server::executor_helpers::EventEmitter;

agent_executor!(MyAgent, |ctx, queue| async {
    let emit = EventEmitter::new(ctx, queue);

    emit.status(TaskState::Working).await?;
    emit.artifact("result", vec![Part::text("done")], None, Some(true)).await?;

    if emit.is_cancelled() {
        emit.status(TaskState::Canceled).await?;
        return Ok(());
    }

    emit.status(TaskState::Completed).await?;
    Ok(())
});
}
MethodDescription
status(TaskState)Emit a status update event
artifact(id, parts, append, last_chunk)Emit an artifact update event
is_cancelled()Check if the task was cancelled

RequestContext

The RequestContext provides information about the incoming request:

FieldTypeDescription
task_idTaskIdServer-assigned task ID
context_idStringConversation context ID
messageMessageThe incoming message with parts
stored_taskOption<Task>Previously stored task snapshot (for continuations)
metadataOption<Value>Arbitrary metadata from the request
cancellation_tokenCancellationTokenToken for cooperative cancellation

EventQueueWriter

The queue is your channel for sending events back to the client:

#![allow(unused)]
fn main() {
// Write a status update
queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
    task_id: ctx.task_id.clone(),
    context_id: ContextId::new(ctx.context_id.clone()),
    status: TaskStatus::new(TaskState::Working),
    metadata: None,
})).await?;

// Write an artifact
queue.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent {
    task_id: ctx.task_id.clone(),
    context_id: ContextId::new(ctx.context_id.clone()),
    artifact: Artifact::new("result", vec![Part::text("output")]),
    append: None,
    last_chunk: Some(true),
    metadata: None,
})).await?;
}

For synchronous clients (SendMessage), the handler collects all events and assembles the final Task response. For streaming clients (SendStreamingMessage), events are delivered as SSE in real time. Your executor doesn't need to know which mode the client used — just write events to the queue.

Common Patterns

The Standard Three-Event Pattern

Most executors follow this structure:

#![allow(unused)]
fn main() {
Box::pin(async move {
    // 1. Working
    queue.write(StreamResponse::StatusUpdate(/* Working */)).await?;

    // 2. Produce results
    let result = do_work(&ctx.message).await?;
    queue.write(StreamResponse::ArtifactUpdate(/* result */)).await?;

    // 3. Completed
    queue.write(StreamResponse::StatusUpdate(/* Completed */)).await?;

    Ok(())
})
}

Error Handling

If your executor encounters an error, transition to Failed with a descriptive message:

#![allow(unused)]
fn main() {
Box::pin(async move {
    queue.write(/* Working */).await?;

    match do_work(&ctx.message).await {
        Ok(result) => {
            queue.write(/* ArtifactUpdate with result */).await?;
            queue.write(/* Completed */).await?;
        }
        Err(e) => {
            queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
                task_id: ctx.task_id.clone(),
                context_id: ContextId::new(ctx.context_id.clone()),
                status: TaskStatus {
                    state: TaskState::Failed,
                    message: Some(Message {
                        id: MessageId::new(uuid::Uuid::new_v4().to_string()),
                        role: MessageRole::Agent,
                        parts: vec![Part::text(&format!("Error: {e}"))],
                        task_id: None,
                        context_id: None,
                        reference_task_ids: None,
                        extensions: None,
                        metadata: None,
                    }),
                    timestamp: None,
                },
                metadata: None,
            })).await?;
        }
    }

    Ok(())
})
}

Requesting More Input

When the agent needs clarification:

#![allow(unused)]
fn main() {
queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
    task_id: ctx.task_id.clone(),
    context_id: ContextId::new(ctx.context_id.clone()),
    status: TaskStatus {
        state: TaskState::InputRequired,
        message: Some(Message {
            id: MessageId::new(uuid::Uuid::new_v4().to_string()),
            role: MessageRole::Agent,
            parts: vec![Part::text("Which format would you like: PDF or HTML?")],
            // ...remaining fields
            task_id: None, context_id: None, reference_task_ids: None,
            extensions: None, metadata: None,
        }),
        timestamp: None,
    },
    metadata: None,
})).await?;
}

The client can then send another message with the same context_id to continue the conversation.

Supporting Cancellation

Override the cancel method:

#![allow(unused)]
fn main() {
fn cancel<'a>(
    &'a self,
    ctx: &'a RequestContext,
    queue: &'a dyn EventQueueWriter,
) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
    Box::pin(async move {
        // Clean up any in-progress work
        self.cancel_token.cancel();

        queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
            task_id: ctx.task_id.clone(),
            context_id: ContextId::new(ctx.context_id.clone()),
            status: TaskStatus::new(TaskState::Canceled),
            metadata: None,
        })).await?;

        Ok(())
    })
}
}

Executor with State

Executors can hold state — database connections, model handles, configuration:

#![allow(unused)]
fn main() {
struct LlmExecutor {
    model: Arc<Model>,
    db: Arc<DatabasePool>,
    max_tokens: usize,
}

impl AgentExecutor for LlmExecutor {
    fn execute<'a>(
        &'a self,
        ctx: &'a RequestContext,
        queue: &'a dyn EventQueueWriter,
    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
        Box::pin(async move {
            // Access self.model, self.db, self.max_tokens
            let response = self.model.generate(&input, self.max_tokens).await?;
            // ...
            Ok(())
        })
    }
}
}

Because the trait requires Send + Sync + 'static, the executor must be safe to share across threads. Use Arc for shared state.

Executor Timeout

The builder can set a timeout that kills hung executors:

#![allow(unused)]
fn main() {
use std::time::Duration;

RequestHandlerBuilder::new(my_executor)
    .with_executor_timeout(Duration::from_secs(300))  // 5 minutes
    .build()
}

If the executor doesn't complete within the timeout, the task transitions to Failed automatically.

Next Steps

Request Handler & Builder

The RequestHandler is the central orchestrator that connects your executor to the protocol. It manages task lifecycle, storage, streaming, push notifications, and interceptors. You build one using RequestHandlerBuilder.

Building a Handler

Minimal Setup

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::RequestHandlerBuilder;

let handler = RequestHandlerBuilder::new(MyExecutor)
    .build()
    .expect("build handler");
}

This gives you sensible defaults:

  • In-memory task store
  • In-memory push config store
  • No push sender (webhooks disabled)
  • No interceptors
  • No agent card
  • No executor timeout

Full Configuration

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::RequestHandlerBuilder;
use std::time::Duration;

let handler = RequestHandlerBuilder::new(MyExecutor)
    // Agent card for discovery
    .with_agent_card(make_agent_card())

    // Task storage
    .with_task_store_config(TaskStoreConfig {
        task_ttl: Some(Duration::from_secs(3600)),  // 1 hour TTL
        max_capacity: Some(10_000),                 // Max 10k tasks
        ..Default::default()
    })

    // Push notifications
    .with_push_sender(HttpPushSender::new())

    // Interceptors
    .with_interceptor(AuthInterceptor::new())
    .with_interceptor(LoggingInterceptor::new())

    // Executor limits
    .with_executor_timeout(Duration::from_secs(300))

    // Streaming limits
    .with_event_queue_capacity(128)
    .with_max_event_size(8 * 1024 * 1024)    // 8 MiB
    .with_max_concurrent_streams(1000)

    .build()
    .expect("build handler");
}

Builder Methods Reference

Required

MethodDescription
new(executor)Set the agent executor (type-erased to Arc<dyn AgentExecutor>)

Optional

MethodDefaultDescription
with_agent_card(AgentCard)NoneDiscovery card for /.well-known/agent.json
with_task_store(impl TaskStore)InMemoryTaskStoreCustom task storage backend
with_task_store_config(TaskStoreConfig)1hr TTL, 10k capacityTTL and capacity for the default store
with_push_config_store(impl PushConfigStore)InMemoryPushConfigStoreCustom push config storage
with_push_sender(impl PushSender)NoneWebhook delivery implementation
with_interceptor(impl ServerInterceptor)Empty chainAdd a server interceptor
with_executor_timeout(Duration)NoneTimeout for executor completion
with_event_queue_capacity(usize)64Bounded channel size per stream
with_max_event_size(usize)16 MiBMaximum serialized event size
with_max_concurrent_streams(usize)UnboundedLimit concurrent SSE streams

Build-Time Validation

build() validates:

  • If an agent card is provided, it must have at least one supported_interfaces entry
  • Executor timeout (if set) must not be zero

Sharing the Handler

The handler is wrapped in Arc for sharing between dispatchers:

#![allow(unused)]
fn main() {
use std::sync::Arc;

let handler = Arc::new(
    RequestHandlerBuilder::new(MyExecutor)
        .build()
        .unwrap()
);

// Both dispatchers share the same handler
let jsonrpc = JsonRpcDispatcher::new(Arc::clone(&handler));
let rest = RestDispatcher::new(handler);
}

This means JSON-RPC and REST clients share the same task store, push configs, and executor.

Task Store Configuration

The default InMemoryTaskStore supports TTL and capacity limits:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::TaskStoreConfig;
use std::time::Duration;

let config = TaskStoreConfig {
    task_ttl: Some(Duration::from_secs(3600)),  // Tasks expire after 1 hour
    max_capacity: Some(50_000),                 // Keep at most 50k tasks
    ..Default::default()
};

RequestHandlerBuilder::new(executor)
    .with_task_store_config(config)
    .build()
}

When capacity is exceeded, the oldest tasks are evicted. When TTL expires, tasks are cleaned up on the next access.

Custom Task Stores

For production use, implement the TaskStore trait for your database:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::TaskStore;

struct PostgresTaskStore { /* ... */ }

impl TaskStore for PostgresTaskStore {
    // Implement get, put, list, delete...
}

RequestHandlerBuilder::new(executor)
    .with_task_store(PostgresTaskStore::new(pool))
    .build()
}

See Task & Config Stores for the full trait API.

Next Steps

Dispatchers (JSON-RPC, REST & gRPC)

Dispatchers translate HTTP/gRPC requests into handler calls. a2a-rust provides four built-in dispatchers: JsonRpcDispatcher, RestDispatcher, WebSocketDispatcher (websocket feature), and GrpcDispatcher (grpc feature).

JsonRpcDispatcher

Routes JSON-RPC 2.0 requests to the handler:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::JsonRpcDispatcher;
use std::sync::Arc;

let dispatcher = Arc::new(JsonRpcDispatcher::new(handler));
}

Features

  • Single endpoint — All methods go to / as POST requests
  • Agent cardGET /.well-known/agent.json returns the agent card (same as REST)
  • Batch support — Handles JSON-RPC batch arrays
  • ID preservation — Echoes back the exact request ID (string, number, float, null)
  • StreamingSendStreamingMessage and SubscribeToTask return SSE streams
  • CORS — Configurable cross-origin headers
  • Content type — Accepts application/json

Batch Restrictions

Streaming methods cannot appear in batch requests:

  • SendStreamingMessage in a batch → error response
  • SubscribeToTask in a batch → error response

An empty batch [] returns a parse error.

RestDispatcher

Routes RESTful HTTP requests to the handler:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::RestDispatcher;
use std::sync::Arc;

let dispatcher = Arc::new(RestDispatcher::new(handler));
}

Route Table

MethodPathHandler
POST/message:sendSendMessage
POST/message:streamSendStreamingMessage
GET/tasksListTasks
GET/tasks/{id}GetTask
POST/tasks/{id}:cancelCancelTask
GET/tasks/{id}:subscribeSubscribeToTask
POST/tasks/{id}/pushNotificationConfigsCreatePushConfig
GET/tasks/{id}/pushNotificationConfigsListPushConfigs
GET/tasks/{id}/pushNotificationConfigs/{cfgId}GetPushConfig
DELETE/tasks/{id}/pushNotificationConfigs/{cfgId}DeletePushConfig
GET/.well-known/agent.jsonAgentCard

Multi-Tenancy

Tenant routes are prefixed with /tenants/{tenant-id}/:

GET /tenants/acme-corp/tasks
GET /tenants/acme-corp/tasks/{id}
POST /tenants/acme-corp/message:send

Built-in Security

The REST dispatcher includes automatic protections:

ProtectionBehavior
Path traversal.. in path segments (including %2E%2E, %2e%2e) → 400
Query string sizeOver 4 KiB → 414
Body sizeOver 4 MiB → 413
Content typeAccepts application/json and application/a2a+json

Server Startup

Both dispatchers implement the Dispatcher trait, so you can use the serve() helper to eliminate hyper boilerplate:

#![allow(unused)]
fn main() {
use a2a_protocol_server::serve::{serve, serve_with_addr};

// Blocking — runs the accept loop on the current task
serve("127.0.0.1:3000", JsonRpcDispatcher::new(handler)).await?;

// Non-blocking — spawns the server and returns the bound address
let addr = serve_with_addr("127.0.0.1:0", dispatcher).await?;
println!("Listening on {addr}");
}

Manual wiring (advanced)

Both dispatchers also expose a dispatch method for direct hyper integration:

#![allow(unused)]
fn main() {
use std::sync::Arc;

async fn start_server(
    dispatcher: Arc<JsonRpcDispatcher>,
    addr: &str,
) {
    let listener = tokio::net::TcpListener::bind(addr)
        .await
        .expect("bind");

    loop {
        let (stream, _) = listener.accept().await.expect("accept");
        let io = hyper_util::rt::TokioIo::new(stream);
        let dispatcher = Arc::clone(&dispatcher);

        tokio::spawn(async move {
            let service = hyper::service::service_fn(move |req| {
                let d = Arc::clone(&dispatcher);
                async move {
                    Ok::<_, std::convert::Infallible>(d.dispatch(req).await)
                }
            });

            let _ = hyper_util::server::conn::auto::Builder::new(
                hyper_util::rt::TokioExecutor::new(),
            )
            .serve_connection(io, service)
            .await;
        });
    }
}
}

No web framework required — the dispatchers work directly with hyper's service layer.

GrpcDispatcher

Routes gRPC requests to the handler via tonic. Enable with the grpc feature flag:

a2a-protocol-server = { version = "0.2", features = ["grpc"] }
#![allow(unused)]
fn main() {
use a2a_protocol_server::{GrpcDispatcher, GrpcConfig};
use std::sync::Arc;

let config = GrpcConfig::default()
    .with_max_message_size(8 * 1024 * 1024)
    .with_concurrency_limit(128);

let dispatcher = GrpcDispatcher::new(handler, config);

// Blocking server
dispatcher.serve("0.0.0.0:50051").await?;

// Non-blocking (returns bound address)
let addr = dispatcher.serve_with_addr("127.0.0.1:0").await?;
println!("gRPC listening on {addr}");

// Pre-bind pattern (when you need the address before building the handler)
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
// ... build handler using addr for agent card URL ...
let dispatcher = GrpcDispatcher::new(handler, config);
let bound = dispatcher.serve_with_listener(listener)?;
}

GrpcConfig

FieldTypeDefaultDescription
max_message_sizeusize4 MiBMaximum inbound/outbound message size
concurrency_limitusize256Maximum concurrent gRPC requests per connection
stream_channel_capacityusize64Bounded channel for streaming responses

Protocol

All 11 A2A methods are mapped to gRPC RPCs. JSON payloads are carried inside protobuf bytes fields, reusing the same serde types as JSON-RPC and REST — no duplicate protobuf definitions needed.

Streaming methods (SendStreamingMessage, SubscribeToTask) use gRPC server streaming.

Custom Server Setup

For advanced scenarios, use into_service() to get a tonic service:

#![allow(unused)]
fn main() {
let svc = dispatcher.into_service();
tonic::transport::Server::builder()
    .add_service(svc)
    .serve(addr)
    .await?;
}

Running Multiple Transports

Serve JSON-RPC and REST on different ports with the same handler:

#![allow(unused)]
fn main() {
use a2a_protocol_server::serve::serve_with_addr;

let handler = Arc::new(
    RequestHandlerBuilder::new(MyExecutor)
        .with_agent_card(make_agent_card("http://localhost:3000", "http://localhost:3001"))
        .build()
        .unwrap(),
);

// JSON-RPC on port 3000
let jsonrpc_addr = serve_with_addr("127.0.0.1:3000", JsonRpcDispatcher::new(Arc::clone(&handler))).await?;

// REST on port 3001
let rest_addr = serve_with_addr("127.0.0.1:3001", RestDispatcher::new(handler)).await?;
}

CORS Configuration

Both dispatchers support CORS for browser-based clients:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::CorsConfig;

// The dispatchers handle OPTIONS preflight automatically.
// CORS headers are included on all responses.
}

Next Steps

Push Notifications

Push notifications let agents deliver results asynchronously via webhooks. Instead of the client holding an SSE connection open, the server POSTs events to a URL the client provides.

How Push Notifications Work

  Client              Agent Server          Client Webhook
     │                      │                      │
     │  CreatePushConfig    │                      │
     │ ────────────────────►│                      │
     │  Config with ID      │                      │
     │ ◄────────────────────│                      │
     │                      │                      │
     │  SendMessage         │                      │
     │ ────────────────────►│                      │
     │  Task (submitted)    │                      │
     │ ◄────────────────────│                      │
     │                      │                      │
     │                      │  Executor runs       │
     │                      │                      │
     │                      │  POST event          │
     │                      │ ────────────────────►│
     │                      │  POST event          │
     │                      │ ────────────────────►│
     │                      │                      │
  1. Client registers a webhook URL via CreateTaskPushNotificationConfig
  2. Client sends a message (with return_immediately: true for async)
  3. Agent processes the message and pushes events to the webhook

Setting Up Push Notifications

Server Side

Enable push by providing a PushSender:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::{RequestHandlerBuilder, HttpPushSender};

let handler = RequestHandlerBuilder::new(my_executor)
    .with_push_sender(HttpPushSender::new())
    .build()
    .unwrap();
}

The built-in HttpPushSender includes:

  • SSRF protection — Resolves URLs and rejects private/loopback IP addresses
  • Header injection prevention — Validates credentials contain no \r or \n
  • HTTPS validation — Optionally enforces HTTPS-only webhook URLs

Client Side

Register a push notification configuration:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::types::push::TaskPushNotificationConfig;

let config = TaskPushNotificationConfig::new(
    "task-abc",                          // Task to watch
    "https://my-service.com/webhook",    // Webhook URL
);

let saved = client.set_push_config(config).await?;
println!("Config ID: {:?}", saved.id);
}

Managing Push Configs

#![allow(unused)]
fn main() {
// List all configs for a task
let configs = client.list_push_configs(ListPushConfigsParams {
    tenant: None,
    task_id: "task-abc".into(),
    page_size: None,
    page_token: None,
}).await?;

// Get a specific config
let config = client.get_push_config("task-abc", "config-123").await?;

// Delete a config
client.delete_push_config("task-abc", "config-123").await?;
}

Authentication

Push configs support authentication for the webhook endpoint:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::types::push::{TaskPushNotificationConfig, AuthenticationInfo};

let mut config = TaskPushNotificationConfig::new("task-abc", "https://webhook.example.com");
config.authentication = Some(AuthenticationInfo {
    scheme: "bearer".into(),
    credentials: "my-secret-token".into(),
});
}

The server includes these credentials in the Authorization header when POSTing to the webhook.

Custom PushSender

Implement the PushSender trait for custom delivery:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::PushSender;

struct SqsPushSender {
    client: aws_sdk_sqs::Client,
}

impl PushSender for SqsPushSender {
    fn send<'a>(
        &'a self,
        config: &'a TaskPushNotificationConfig,
        event: &'a StreamResponse,
    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
        Box::pin(async move {
            // Send event to SQS instead of HTTP webhook
            Ok(())
        })
    }
}
}

Push Config Storage

The default InMemoryPushConfigStore stores configs in memory with per-task limits. For production, implement PushConfigStore:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::PushConfigStore;

struct PostgresPushConfigStore { /* ... */ }

impl PushConfigStore for PostgresPushConfigStore {
    // Implement create, get, list, delete...
}

RequestHandlerBuilder::new(executor)
    .with_push_config_store(PostgresPushConfigStore::new(pool))
    .build()
}

Security Considerations

  • Always use HTTPS for webhook URLs in production
  • The built-in HttpPushSender rejects private IP addresses to prevent SSRF attacks
  • Webhook credentials are validated for header injection characters
  • Consider rate limiting webhook delivery to prevent abuse

Next Steps

Interceptors & Middleware

Interceptors let you hook into the request/response pipeline on both the client and server side — for authentication, logging, metrics, rate limiting, or any cross-cutting concern.

Server Interceptors

Server interceptors run before and after the handler processes a request:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::ServerInterceptor;

struct LoggingInterceptor;

impl ServerInterceptor for LoggingInterceptor {
    // Intercept incoming requests and outgoing responses
}
}

Adding Interceptors

#![allow(unused)]
fn main() {
RequestHandlerBuilder::new(my_executor)
    .with_interceptor(AuthInterceptor::new(auth_config))
    .with_interceptor(LoggingInterceptor)
    .with_interceptor(MetricsInterceptor::new())
    .build()
}

Interceptors execute in the order they're added:

Request → Auth → Logging → Metrics → Handler → Metrics → Logging → Auth → Response

Example: Authentication

#![allow(unused)]
fn main() {
struct BearerAuthInterceptor {
    valid_tokens: HashSet<String>,
}

impl ServerInterceptor for BearerAuthInterceptor {
    // Check Authorization header before passing to handler
    // Return 401 if token is missing or invalid
}
}

Client Interceptors

Client interceptors modify outgoing requests and incoming responses:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::client::CallInterceptor;

struct RequestIdInterceptor;

impl CallInterceptor for RequestIdInterceptor {
    // Add X-Request-Id header to outgoing requests
    // Log the response status
}
}

Adding Client Interceptors

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::client::ClientBuilder;

let client = ClientBuilder::new("http://agent.example.com".into())
    .with_interceptor(RequestIdInterceptor)
    .with_interceptor(RetryInterceptor::new(3))
    .build()
    .unwrap();
}

Common Patterns

Logging

Log method names, durations, and errors:

#![allow(unused)]
fn main() {
struct LoggingInterceptor;
// Log: "SendMessage completed in 42ms"
// Log: "GetTask failed: task not found (15ms)"
}

Metrics

Track request counts, latencies, error rates:

#![allow(unused)]
fn main() {
struct MetricsInterceptor {
    counter: Arc<AtomicU64>,
}
// Increment counter on each request
// Record latency histogram
}

Rate Limiting

The built-in RateLimitInterceptor provides per-caller fixed-window rate limiting:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::{RateLimitInterceptor, RateLimitConfig};
use std::sync::Arc;

let limiter = Arc::new(RateLimitInterceptor::new(RateLimitConfig {
    requests_per_window: 100,
    window_secs: 60,
}));

// Add to handler builder:
RequestHandlerBuilder::new(my_executor)
    .with_interceptor(limiter)
    .build()
}

Caller keys are derived from CallContext::caller_identity (set by auth interceptors), the X-Forwarded-For header, or "anonymous". For advanced use cases (sliding windows, distributed counters), implement a custom ServerInterceptor or use a reverse proxy.

Interceptor Chain

Both client and server support ordered interceptor chains. The chain is built incrementally:

#![allow(unused)]
fn main() {
// Each .with_interceptor() call appends to the chain
builder
    .with_interceptor(first)    // Runs first on request, last on response
    .with_interceptor(second)   // Runs second on request, second-to-last on response
    .with_interceptor(third)    // Runs third on request, first on response
}

Next Steps

Task & Config Stores

a2a-rust uses pluggable storage backends for tasks and push notification configs. The built-in in-memory stores work for development and testing. For production, implement the traits for your database.

TaskStore Trait

The TaskStore trait defines how tasks are persisted:

#![allow(unused)]
fn main() {
pub trait TaskStore: Send + Sync + 'static {
    fn save(&self, task: Task)
        -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + '_>>;

    fn get(&self, id: &TaskId)
        -> Pin<Box<dyn Future<Output = A2aResult<Option<Task>>> + Send + '_>>;

    fn list(&self, params: &ListTasksParams)
        -> Pin<Box<dyn Future<Output = A2aResult<TaskListResponse>> + Send + '_>>;

    fn insert_if_absent(&self, task: Task)
        -> Pin<Box<dyn Future<Output = A2aResult<bool>> + Send + '_>>;

    fn delete(&self, id: &TaskId)
        -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + '_>>;

    /// Returns the total number of tasks. Default returns 0.
    fn count(&self)
        -> Pin<Box<dyn Future<Output = A2aResult<u64>> + Send + '_>>;
}
}

InMemoryTaskStore

The default implementation with optional TTL and capacity limits:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::{InMemoryTaskStore, TaskStoreConfig};
use std::time::Duration;

// Default: 1hr TTL, 10k capacity
let store = InMemoryTaskStore::new();

// With custom limits
let store = InMemoryTaskStore::with_config(TaskStoreConfig {
    task_ttl: Some(Duration::from_secs(7200)),  // 2 hour TTL
    max_capacity: Some(50_000),
    ..Default::default()
});
}

Features:

  • Thread-safe (DashMap-based or RwLock<HashMap>)
  • Automatic TTL eviction on access
  • Capacity eviction (oldest first) when limit exceeded
  • Pagination support with cursor tokens
  • Filtering by context_id, status, and timestamp

SqliteTaskStore (feature-gated)

Enable the sqlite feature for a production-ready persistent store:

[dependencies]
a2a-protocol-server = { version = "0.2", features = ["sqlite"] }
#![allow(unused)]
fn main() {
use a2a_protocol_server::store::SqliteTaskStore;

let store = SqliteTaskStore::new("sqlite:tasks.db").await?;
// Or use an in-memory database for testing:
let store = SqliteTaskStore::new("sqlite::memory:").await?;
}

Features:

  • Auto-creates schema on first use
  • Stores tasks as JSON blobs with indexed context_id and state columns
  • Cursor-based pagination via id > ? ordering
  • Atomic insert_if_absent via INSERT OR IGNORE
  • Upsert via ON CONFLICT DO UPDATE

TenantAwareInMemoryTaskStore

For multi-tenant deployments, use TenantAwareInMemoryTaskStore which provides full tenant isolation using tokio::task_local!:

#![allow(unused)]
fn main() {
use a2a_protocol_server::store::{TenantAwareInMemoryTaskStore, TenantContext};
use std::sync::Arc;

let store = Arc::new(TenantAwareInMemoryTaskStore::new());

// Each tenant gets an independent store instance.
// Use TenantContext::scope() to set the active tenant:
TenantContext::scope("tenant-alpha".to_string(), {
    let store = store.clone();
    async move {
        store.save(task).await.unwrap();
    }
}).await;

// Tasks saved under one tenant are invisible to others:
TenantContext::scope("tenant-beta".to_string(), {
    let store = store.clone();
    async move {
        let result = store.get(&task_id).await.unwrap();
        assert!(result.is_none()); // tenant-beta can't see tenant-alpha's task
    }
}).await;

// Track tenant count for capacity monitoring:
let count = store.tenant_count().await;
}

The TenantContext::scope() pattern uses tokio::task_local! to thread the tenant ID through the async call stack without passing it as a parameter. The RequestHandler automatically sets the tenant scope when params.tenant is populated.

TenantAwareSqliteTaskStore (feature-gated)

For persistent multi-tenant storage, enable the sqlite feature:

#![allow(unused)]
fn main() {
use a2a_protocol_server::store::TenantAwareSqliteTaskStore;

let store = TenantAwareSqliteTaskStore::new("sqlite:tasks.db").await?;
}

This variant partitions data by a tenant_id column instead of using task-local storage, making it suitable for production deployments where tenants may span multiple server instances.

Note: Corresponding TenantAwareInMemoryPushConfigStore and TenantAwareSqlitePushConfigStore variants exist for push notification config storage.

Custom Implementation

#![allow(unused)]
fn main() {
struct PostgresTaskStore {
    pool: sqlx::PgPool,
}

impl TaskStore for PostgresTaskStore {
    fn get<'a>(&'a self, id: &'a TaskId)
        -> Pin<Box<dyn Future<Output = A2aResult<Option<Task>>> + Send + 'a>>
    {
        Box::pin(async move {
            let row = sqlx::query_as("SELECT data FROM tasks WHERE id = $1")
                .bind(id.as_ref())
                .fetch_optional(&self.pool)
                .await
                .map_err(|e| A2aError::internal(e.to_string()))?;

            Ok(row.map(|r| serde_json::from_value(r.data).unwrap()))
        })
    }

    // ... implement save, list, delete, insert_if_absent, count similarly
}
}

PushConfigStore Trait

The PushConfigStore trait manages push notification configurations:

#![allow(unused)]
fn main() {
pub trait PushConfigStore: Send + Sync + 'static {
    fn set(&self, config: TaskPushNotificationConfig)
        -> Pin<Box<dyn Future<Output = A2aResult<TaskPushNotificationConfig>> + Send + '_>>;

    fn get(&self, task_id: &str, id: &str)
        -> Pin<Box<dyn Future<Output = A2aResult<Option<TaskPushNotificationConfig>>> + Send + '_>>;

    fn list(&self, task_id: &str)
        -> Pin<Box<dyn Future<Output = A2aResult<Vec<TaskPushNotificationConfig>>> + Send + '_>>;

    fn delete(&self, task_id: &str, id: &str)
        -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + '_>>;
}
}

InMemoryPushConfigStore

The default implementation stores configs in a HashMap:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::InMemoryPushConfigStore;

let store = InMemoryPushConfigStore::new();
}

Features:

  • Server-assigned config IDs (UUIDs)
  • Per-task config limits (prevents abuse)
  • Thread-safe access

Wiring Custom Stores

#![allow(unused)]
fn main() {
let handler = RequestHandlerBuilder::new(executor)
    .with_task_store(PostgresTaskStore::new(pool.clone()))
    .with_push_config_store(PostgresPushConfigStore::new(pool))
    .build()
    .unwrap();
}

Design Considerations

Object Safety

Both traits use Pin<Box<dyn Future>> return types for object safety. This allows the handler to store them as Arc<dyn TaskStore>.

Tenant Isolation

Tenant isolation uses tokio::task_local! via TenantContext::scope(), not method parameters. For in-memory stores, TenantAwareInMemoryTaskStore automatically partitions data by tenant. For SQL stores, TenantAwareSqliteTaskStore partitions by a tenant_id column. If you implement a custom store, use TenantContext::current_tenant() to retrieve the active tenant within the async call stack.

Pagination

The list method receives ListTasksParams with:

  • page_size — Number of results per page (1-100, default 50)
  • page_token — Opaque cursor for the next page
  • Various filter fields

Your implementation should return a TaskListResponse with a next_page_token if more results exist.

Concurrency

Both traits require Send + Sync. Use connection pools, not single connections:

#![allow(unused)]
fn main() {
// Good
struct MyStore { pool: sqlx::PgPool }

// Bad — not Send + Sync
struct MyStore { conn: sqlx::PgConnection }
}

Next Steps

Building a Client

The ClientBuilder creates an A2aClient configured for your target agent. It handles transport selection, timeouts, authentication, and interceptors.

Basic Client

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::client::ClientBuilder;

let client = ClientBuilder::new("http://agent.example.com".to_string())
    .build()
    .expect("build client");
}

The builder auto-selects the transport based on the URL. To explicitly choose:

#![allow(unused)]
fn main() {
// Force JSON-RPC transport
let client = ClientBuilder::new("http://agent.example.com".to_string())
    .with_protocol_binding("JSONRPC")
    .build()
    .unwrap();

// Force REST transport
let client = ClientBuilder::new("http://agent.example.com".to_string())
    .with_protocol_binding("REST")
    .build()
    .unwrap();
}

Configuration Options

Timeouts

#![allow(unused)]
fn main() {
use std::time::Duration;

let client = ClientBuilder::new(url)
    .with_timeout(Duration::from_secs(60))              // Per-request timeout (default: 30s)
    .with_connection_timeout(Duration::from_secs(5))     // TCP connect timeout (default: 10s)
    .with_stream_connect_timeout(Duration::from_secs(15)) // SSE connect timeout (default: 30s)
    .build()
    .unwrap();
}

Output Modes

Specify which MIME types the client can handle:

#![allow(unused)]
fn main() {
let client = ClientBuilder::new(url)
    .with_accepted_output_modes(vec![
        "text/plain".into(),
        "application/json".into(),
        "image/png".into(),
    ])
    .build()
    .unwrap();
}

Default: ["text/plain", "application/json"]

History Length

Control how many historical messages are included in responses:

#![allow(unused)]
fn main() {
let client = ClientBuilder::new(url)
    .with_history_length(10)  // Include last 10 messages
    .build()
    .unwrap();
}

Interceptors

Add request/response hooks:

#![allow(unused)]
fn main() {
let client = ClientBuilder::new(url)
    .with_interceptor(MyAuthInterceptor::new())
    .with_interceptor(LoggingInterceptor)
    .build()
    .unwrap();
}

Retry Policy

Enable automatic retries on transient failures (connection errors, timeouts, HTTP 429/502/503/504):

#![allow(unused)]
fn main() {
use a2a_protocol_client::RetryPolicy;

let client = ClientBuilder::new(url)
    .with_retry_policy(RetryPolicy::default())  // 3 retries, 500ms initial backoff
    .build()
    .unwrap();

// Custom retry configuration
let client = ClientBuilder::new(url)
    .with_retry_policy(
        RetryPolicy::default()
            .with_max_retries(5)
            .with_initial_backoff(Duration::from_secs(1))
            .with_max_backoff(Duration::from_secs(60))
            .with_backoff_multiplier(3.0),
    )
    .build()
    .unwrap();
}

Return Immediately

For push notification workflows, return the task immediately without waiting for completion:

#![allow(unused)]
fn main() {
let client = ClientBuilder::new(url)
    .with_return_immediately(true)
    .build()
    .unwrap();
}

Builder Reference

MethodDefaultDescription
new(url)Base URL of the agent (required)
with_protocol_binding(str)Auto-detectForce transport: "JSONRPC", "REST", or "GRPC"
with_custom_transport(impl Transport)NoneUse a custom transport (e.g., GrpcTransport)
with_timeout(Duration)30sPer-request timeout
with_connection_timeout(Duration)10sTCP connection timeout
with_stream_connect_timeout(Duration)30sSSE stream connect timeout
with_retry_policy(RetryPolicy)NoneRetry on transient errors
with_accepted_output_modes(Vec<String>)["text/plain", "application/json"]MIME types the client handles
with_history_length(u32)NoneMessages to include in responses
with_return_immediately(bool)falseDon't wait for task completion
with_interceptor(impl CallInterceptor)Empty chainAdd request/response hook

Client Reuse (Best Practice)

Create clients once and reuse them across requests. Each A2aClient holds a connection pool internally (via hyper), so reuse avoids repeated DNS resolution, TCP handshakes, and TLS negotiation on every call.

#![allow(unused)]
fn main() {
// ✅ Good: build once, reuse across requests
struct MyOrchestrator {
    analyzer: A2aClient,
    builder: A2aClient,
}

impl MyOrchestrator {
    fn new(analyzer_url: &str, builder_url: &str) -> Self {
        Self {
            analyzer: ClientBuilder::new(analyzer_url).build().unwrap(),
            builder: ClientBuilder::new(builder_url)
                .with_protocol_binding("REST")
                .build()
                .unwrap(),
        }
    }

    async fn run(&self) {
        // Reuse the same client for every request
        let _ = self.analyzer.send_message(params).await;
        let _ = self.builder.send_message(params).await;
    }
}
}
#![allow(unused)]
fn main() {
// ❌ Avoid: rebuilding the client on every call
async fn bad_pattern(url: &str) {
    // This works but wastes resources — connection pool is discarded each time
    let client = ClientBuilder::new(url).build().unwrap();
    let _ = client.send_message(params).await;
}
}

gRPC Client

For gRPC transport, use GrpcTransport::connect() with with_custom_transport():

#![allow(unused)]
fn main() {
use a2a_protocol_client::GrpcTransport;

let transport = GrpcTransport::connect("http://agent.example.com:50051").await?;
let client = ClientBuilder::new("http://agent.example.com:50051")
    .with_custom_transport(transport)
    .build()?;
}

Configure with GrpcTransportConfig:

#![allow(unused)]
fn main() {
use a2a_protocol_client::transport::grpc::{GrpcTransport, GrpcTransportConfig};
use std::time::Duration;

let config = GrpcTransportConfig::default()
    .with_timeout(Duration::from_secs(60))
    .with_max_message_size(8 * 1024 * 1024);

let transport = GrpcTransport::connect_with_config(
    "http://agent.example.com:50051",
    config,
).await?;
}

Thread Safety

A2aClient is Send + Sync and can be shared across tasks via Arc:

#![allow(unused)]
fn main() {
use std::sync::Arc;

let client = Arc::new(
    ClientBuilder::new(url).build().unwrap()
);

// Share across async tasks
let c1 = Arc::clone(&client);
tokio::spawn(async move { c1.send_message(params1).await });

let c2 = Arc::clone(&client);
tokio::spawn(async move { c2.send_message(params2).await });
}

Next Steps

Sending Messages

The most common operation: send a message to an agent and get a response.

Synchronous Send

send_message sends a message and waits for the task to complete:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::prelude::*;

let params = MessageSendParams {
    tenant: None,
    message: Message {
        id: MessageId::new(uuid::Uuid::new_v4().to_string()),
        role: MessageRole::User,
        parts: vec![Part::text("What is the capital of France?")],
        task_id: None,
        context_id: None,
        reference_task_ids: None,
        extensions: None,
        metadata: None,
    },
    configuration: None,
    metadata: None,
};

let response = client.send_message(params).await?;
}

Handling the Response

SendMessageResponse is an enum with two variants:

#![allow(unused)]
fn main() {
match response {
    SendMessageResponse::Task(task) => {
        println!("Task ID: {}", task.id);
        println!("Status: {:?}", task.status.state);

        // Extract text from artifacts
        if let Some(artifacts) = &task.artifacts {
            for artifact in artifacts {
                for part in &artifact.parts {
                    if let a2a_protocol_types::message::PartContent::Text { text } = &part.content {
                        println!("Result: {text}");
                    }
                }
            }
        }
    }
    SendMessageResponse::Message(msg) => {
        // Some agents respond with a direct message instead of a task
        println!("Direct message: {:?}", msg);
    }
    _ => {}
}
}

Configuration

Customize the send with SendMessageConfiguration:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::types::params::SendMessageConfiguration;

let params = MessageSendParams {
    tenant: None,
    message: make_message("Translate to French"),
    configuration: Some(SendMessageConfiguration {
        accepted_output_modes: vec!["text/plain".into()],
        task_push_notification_config: None,
        history_length: Some(5),       // Include last 5 messages
        return_immediately: Some(false), // Wait for completion
    }),
    metadata: None,
};
}

Continuing a Conversation

To continue a conversation, include the context_id from a previous task:

#![allow(unused)]
fn main() {
let first_response = client.send_message(MessageSendParams {
    message: Message {
        id: MessageId::new(uuid::Uuid::new_v4().to_string()),
        role: MessageRole::User,
        parts: vec![Part::text("Tell me about Rust")],
        task_id: None,
        context_id: None,  // New conversation
        reference_task_ids: None,
        extensions: None,
        metadata: None,
    },
    tenant: None,
    configuration: None,
    metadata: None,
}).await?;

// Get the context ID from the first response
let context_id = if let SendMessageResponse::Task(task) = &first_response {
    Some(task.context_id.clone())
} else {
    None
};

// Continue the conversation
let follow_up = client.send_message(MessageSendParams {
    message: Message {
        id: MessageId::new(uuid::Uuid::new_v4().to_string()),
        role: MessageRole::User,
        parts: vec![Part::text("What about error handling?")],
        task_id: None,
        context_id: context_id.map(|c| ContextId::new(c.to_string())),
        reference_task_ids: None,
        extensions: None,
        metadata: None,
    },
    tenant: None,
    configuration: None,
    metadata: None,
}).await?;
}

Multi-Part Messages

Send messages with multiple content types:

#![allow(unused)]
fn main() {
let message = Message {
    id: MessageId::new(uuid::Uuid::new_v4().to_string()),
    role: MessageRole::User,
    parts: vec![
        Part::text("Analyze this image:"),
        Part::url("https://example.com/chart.png"),
        Part::data(serde_json::json!({
            "analysis_type": "detailed",
            "language": "en"
        })),
    ],
    task_id: None,
    context_id: None,
    reference_task_ids: None,
    extensions: None,
    metadata: None,
};
}

Next Steps

Streaming Responses

For long-running tasks or when you want real-time progress, use stream_message to receive SSE events as the agent works.

Basic Streaming

#![allow(unused)]
fn main() {
let mut stream = client
    .stream_message(params)
    .await
    .expect("connect to stream");

while let Some(event) = stream.next().await {
    match event {
        Ok(StreamResponse::StatusUpdate(ev)) => {
            println!("Status: {:?}", ev.status.state);
        }
        Ok(StreamResponse::ArtifactUpdate(ev)) => {
            for part in &ev.artifact.parts {
                if let a2a_protocol_types::message::PartContent::Text { text } = &part.content {
                    print!("{text}");
                }
            }
            if ev.last_chunk == Some(true) {
                println!(); // Newline after final chunk
            }
        }
        Ok(StreamResponse::Task(task)) => {
            println!("Final: {:?}", task.status.state);
        }
        Ok(StreamResponse::Message(msg)) => {
            println!("Message: {:?}", msg);
        }
        Ok(_) => {
            // Future event types — handle gracefully
        }
        Err(e) => {
            eprintln!("Error: {e}");
            break;
        }
    }
}
}

Event Ordering

A typical stream delivers events in this order:

  1. StatusUpdateWorking
  2. ArtifactUpdate (one or more, potentially chunked)
  3. StatusUpdateCompleted (or Failed)
  4. Optionally, a final Task snapshot

Chunked Artifacts

Artifacts can be delivered in multiple chunks:

#![allow(unused)]
fn main() {
Ok(StreamResponse::ArtifactUpdate(ev)) => {
    let is_append = ev.append.unwrap_or(false);
    let is_last = ev.last_chunk.unwrap_or(false);

    if is_append {
        // Append to existing artifact
        buffer.push_str(&extract_text(&ev.artifact));
    } else {
        // New artifact or first chunk
        buffer = extract_text(&ev.artifact);
    }

    if is_last {
        println!("Complete artifact: {buffer}");
    }
}
}

Re-subscribing

If a stream disconnects, re-subscribe to get the latest state:

#![allow(unused)]
fn main() {
let mut stream = client
    .subscribe_to_task("task-abc")
    .await?;

// Continue processing events...
while let Some(event) = stream.next().await {
    // ...
}
}

Stream Timeouts

The client has separate timeouts for stream connections:

#![allow(unused)]
fn main() {
use std::time::Duration;

let client = ClientBuilder::new(url)
    .with_stream_connect_timeout(Duration::from_secs(15))
    .build()?;
}

The connect timeout applies to establishing the SSE connection. Once connected, the stream stays open until the server closes it or an error occurs.

Safety Limits

The SSE parser protects against resource exhaustion:

LimitValuePurpose
Buffer cap16 MiBPrevents OOM from oversized events
Connect timeout30s (default)Fails fast on unreachable servers

Next Steps

Task Management

Beyond sending messages, the client provides methods for querying, listing, and canceling tasks.

Get a Task

Retrieve a task by ID:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::types::params::TaskQueryParams;

let task = client.get_task(TaskQueryParams {
    tenant: None,
    id: "task-abc".into(),
    history_length: Some(10),  // Include last 10 messages
}).await?;

println!("Task: {} ({:?})", task.id, task.status.state);

if let Some(artifacts) = &task.artifacts {
    println!("Artifacts: {}", artifacts.len());
}

if let Some(history) = &task.history {
    println!("Messages: {}", history.len());
}
}

List Tasks

Query tasks with filtering and pagination:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::types::params::ListTasksParams;

let response = client.list_tasks(ListTasksParams {
    tenant: None,
    context_id: Some("ctx-123".into()),       // Filter by context
    status: Some(TaskState::Completed),         // Filter by state
    page_size: Some(20),                        // 20 per page
    page_token: None,                           // First page
    status_timestamp_after: None,
    include_artifacts: Some(true),
    history_length: None,
}).await?;

for task in &response.tasks {
    println!("{}: {:?}", task.id, task.status.state);
}

// Paginate
if let Some(token) = &response.next_page_token {
    let next_page = client.list_tasks(ListTasksParams {
        page_token: Some(token.clone()),
        ..Default::default()
    }).await?;
}
}

Filtering Options

ParameterDescription
context_idTasks in a specific conversation
statusTasks in a specific state
status_timestamp_afterTasks updated after a timestamp (ISO 8601)
page_sizeResults per page (1-100, default 50)
page_tokenCursor for the next page
include_artifactsInclude artifact data in results
history_lengthNumber of messages per task

Cancel a Task

Request cancellation of a running task:

#![allow(unused)]
fn main() {
let task = client.cancel_task("task-abc").await?;

println!("Task state: {:?}", task.status.state);
// → Canceled (if the agent supports cancellation)
}

Cancellation is cooperative — the agent's executor must implement the cancel method. If the agent doesn't support cancellation, you'll get an error response.

Cancellation States

Current StateCan Cancel?
SubmittedYes → Canceled
WorkingYes → Canceled (if agent supports it)
InputRequiredYes → Canceled
AuthRequiredYes → Canceled
CompletedNo (terminal state)
FailedNo (terminal state)
CanceledNo (already canceled)
RejectedNo (terminal state)

Next Steps

Error Handling

a2a-rust uses a layered error model: protocol-level errors (A2aError), client errors (ClientError), and server errors (ServerError).

A2aError (Protocol Level)

Protocol errors defined by the A2A spec:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::types::error::{A2aError, ErrorCode};

// Common error codes
ErrorCode::TaskNotFound         // Task doesn't exist
ErrorCode::TaskNotCancelable    // Agent doesn't support cancellation
ErrorCode::InvalidParams        // Bad request parameters
ErrorCode::MethodNotFound       // Unknown method
ErrorCode::InternalError        // Server-side failure
ErrorCode::UnsupportedOperation // Not implemented
}

Handling Protocol Errors

#![allow(unused)]
fn main() {
match client.get_task(params).await {
    Ok(task) => println!("Got task: {}", task.id),
    Err(e) => {
        // Check the error type
        eprintln!("Error: {e}");
    }
}
}

Client Errors

The client wraps transport and protocol errors:

#![allow(unused)]
fn main() {
match client.send_message(params).await {
    Ok(response) => { /* handle response */ }
    Err(e) => {
        // Transport errors (network, timeout, etc.)
        // Protocol errors (task not found, invalid params, etc.)
        // Parse errors (malformed response)
        eprintln!("Client error: {e}");
    }
}
}

Timeout Errors

#![allow(unused)]
fn main() {
// Per-request timeout
let client = ClientBuilder::new(url)
    .with_timeout(Duration::from_secs(5))
    .build()?;

// This will error if the agent takes longer than 5 seconds
match client.send_message(params).await {
    Ok(response) => { /* success */ }
    Err(e) => {
        // Could be a timeout error
        eprintln!("Failed (possibly timeout): {e}");
    }
}
}

Connection Errors

#![allow(unused)]
fn main() {
// Connection timeout
let client = ClientBuilder::new(url)
    .with_connection_timeout(Duration::from_secs(2))
    .build()?;
}

Automatic Retries

Use RetryPolicy to automatically retry transient errors:

#![allow(unused)]
fn main() {
use a2a_protocol_client::RetryPolicy;

let client = ClientBuilder::new(url)
    .with_retry_policy(RetryPolicy::default())
    .build()?;
}

You can check if an error is retryable programmatically:

#![allow(unused)]
fn main() {
match client.send_message(params).await {
    Err(e) if e.is_retryable() => println!("Transient error: {e}"),
    Err(e) => println!("Permanent error: {e}"),
    Ok(resp) => { /* ... */ }
}
}

Retryable errors include: Http, HttpClient, Timeout, and UnexpectedStatus with codes 429, 502, 503, or 504.

Server Errors

When building an agent, the ServerError type covers handler-level failures:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::ServerError;

// Server errors are returned by RequestHandlerBuilder::build()
// and by store/executor operations
}

Best Practices

Don't Panic

a2a-rust never panics in library code. All fallible operations return Result. Follow the same pattern in your executors:

#![allow(unused)]
fn main() {
// Good: return an error
return Err(A2aError::internal("processing failed".into()));

// Bad: panic
panic!("processing failed");
}

Executor Error Handling

In your AgentExecutor, catch errors and report them as status updates:

#![allow(unused)]
fn main() {
Box::pin(async move {
    queue.write(/* Working */).await?;

    match risky_operation().await {
        Ok(result) => {
            queue.write(/* ArtifactUpdate */).await?;
            queue.write(/* Completed */).await?;
        }
        Err(e) => {
            // Report failure through the protocol
            queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
                task_id: ctx.task_id.clone(),
                context_id: ContextId::new(ctx.context_id.clone()),
                status: TaskStatus {
                    state: TaskState::Failed,
                    message: Some(Message {
                        id: MessageId::new(uuid::Uuid::new_v4().to_string()),
                        role: MessageRole::Agent,
                        parts: vec![Part::text(&e.to_string())],
                        task_id: None,
                        context_id: None,
                        reference_task_ids: None,
                        extensions: None,
                        metadata: None,
                    }),
                    timestamp: None,
                },
                metadata: None,
            })).await?;
        }
    }

    Ok(())
})
}

Stream Error Recovery

For streaming, handle errors per-event:

#![allow(unused)]
fn main() {
while let Some(event) = stream.next().await {
    match event {
        Ok(ev) => { /* process event */ }
        Err(e) => {
            eprintln!("Stream error: {e}");
            // Decide: retry via resubscribe, or give up
            break;
        }
    }
}
}

Next Steps

Testing Your Agent

a2a-rust makes it easy to test agents at multiple levels: unit testing executors, integration testing with real HTTP, and property-based testing with fuzz targets.

Unit Testing Executors

Test your executor logic directly by creating a RequestContext and mock EventQueueWriter:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::prelude::*;
use a2a_protocol_server::streaming::event_queue::new_in_memory_queue;
use tokio_util::sync::CancellationToken;

#[tokio::test]
async fn test_calculator_executor() {
    let executor = CalcExecutor;

    // Create a writer/reader pair directly for unit testing
    let (writer, mut reader) = new_in_memory_queue();

    // Build the request context
    let ctx = RequestContext {
        task_id: TaskId::new("test-task"),
        context_id: "ctx-1".into(),
        message: Message {
            id: MessageId::new("msg-1"),
            role: MessageRole::User,
            parts: vec![Part::text("3 + 5")],
            task_id: None,
            context_id: None,
            reference_task_ids: None,
            extensions: None,
            metadata: None,
        },
        stored_task: None,
        metadata: None,
        cancellation_token: CancellationToken::new(),
    };

    // Run the executor
    executor.execute(&ctx, &*writer).await.unwrap();

    // Read events from the queue
    let events: Vec<_> = collect_events(&mut reader).await;

    // Verify: Working → ArtifactUpdate → Completed
    assert!(matches!(&events[0],
        StreamResponse::StatusUpdate(e) if e.status.state == TaskState::Working));
    assert!(matches!(&events[1],
        StreamResponse::ArtifactUpdate(e) if extract_text(&e.artifact) == "8"));
    assert!(matches!(&events[2],
        StreamResponse::StatusUpdate(e) if e.status.state == TaskState::Completed));
}
}

Integration Testing with HTTP

Test the full stack by starting a real server and using a client:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::{RequestHandlerBuilder, JsonRpcDispatcher};
use a2a_protocol_sdk::client::ClientBuilder;
use std::sync::Arc;

#[tokio::test]
async fn test_end_to_end() {
    // Build handler and server
    let handler = Arc::new(
        RequestHandlerBuilder::new(CalcExecutor).build().unwrap()
    );
    let dispatcher = Arc::new(JsonRpcDispatcher::new(handler));
    let addr = start_test_server(dispatcher).await;

    // Build client
    let client = ClientBuilder::new(format!("http://{addr}"))
        .build()
        .unwrap();

    // Send a message
    let response = client.send_message(MessageSendParams {
        tenant: None,
        message: Message {
            id: MessageId::new("test-msg"),
            role: MessageRole::User,
            parts: vec![Part::text("10 + 20")],
            task_id: None,
            context_id: None,
            reference_task_ids: None,
            extensions: None,
            metadata: None,
        },
        configuration: None,
        metadata: None,
    }).await.unwrap();

    // Verify
    if let SendMessageResponse::Task(task) = response {
        assert_eq!(task.status.state, TaskState::Completed);
    } else {
        panic!("expected task response");
    }
}

async fn start_test_server(
    dispatcher: Arc<JsonRpcDispatcher>,
) -> std::net::SocketAddr {
    let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
        .await.unwrap();
    let addr = listener.local_addr().unwrap();

    tokio::spawn(async move {
        loop {
            let (stream, _) = listener.accept().await.unwrap();
            let io = hyper_util::rt::TokioIo::new(stream);
            let d = Arc::clone(&dispatcher);
            tokio::spawn(async move {
                let svc = hyper::service::service_fn(move |req| {
                    let d = Arc::clone(&d);
                    async move { Ok::<_, std::convert::Infallible>(d.dispatch(req).await) }
                });
                let _ = hyper_util::server::conn::auto::Builder::new(
                    hyper_util::rt::TokioExecutor::new(),
                ).serve_connection(io, svc).await;
            });
        }
    });

    addr
}
}

Testing Both Transports

Run the same tests against both JSON-RPC and REST:

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_jsonrpc_transport() {
    let addr = start_jsonrpc_server().await;
    let client = ClientBuilder::new(format!("http://{addr}")).build().unwrap();
    run_test_suite(&client).await;
}

#[tokio::test]
async fn test_rest_transport() {
    let addr = start_rest_server().await;
    let client = ClientBuilder::new(format!("http://{addr}"))
        .with_protocol_binding("REST")
        .build().unwrap();
    run_test_suite(&client).await;
}

async fn run_test_suite(client: &A2aClient) {
    // Test send_message, stream_message, get_task, etc.
}
}

Testing Streaming

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_streaming() {
    let addr = start_server().await;
    let client = ClientBuilder::new(format!("http://{addr}")).build().unwrap();

    let mut stream = client.stream_message(params).await.unwrap();
    let mut events = vec![];

    while let Some(event) = stream.next().await {
        events.push(event.unwrap());
    }

    // Verify event sequence
    assert!(events.len() >= 3); // Working + Artifact + Completed
}
}

Wire Format Tests

Verify JSON serialization matches the A2A spec:

#![allow(unused)]
fn main() {
#[test]
fn task_state_wire_format() {
    let status = TaskStatus::new(TaskState::Completed);
    let json = serde_json::to_string(&status).unwrap();
    assert!(json.contains("\"TASK_STATE_COMPLETED\""));
}

#[test]
fn message_role_wire_format() {
    let msg = Message {
        id: MessageId::new("m1"),
        role: MessageRole::User,
        parts: vec![Part::text("hi")],
        // ...
    };
    let json = serde_json::to_string(&msg).unwrap();
    assert!(json.contains("\"ROLE_USER\""));
    assert!(json.contains("\"messageId\""));
}
}

Fuzz Testing

The fuzz/ directory contains fuzz targets for JSON parsing:

# Requires nightly Rust
cd fuzz
cargo +nightly fuzz run fuzz_target

Fuzz testing helps find edge cases in JSON deserialization that unit tests miss.

Why No Single Test Type Is Enough

A key lesson from a2a-rust is that no single testing technique — not even all of them together minus one — is sufficient. Each layer catches a different class of bug, and the gaps between layers are where production incidents hide:

Test typeWhat it provesWhat it cannot prove
Unit testsIndividual functions return correct valuesThat calling code uses those values correctly
Integration testsComponents work together pairwiseMulti-hop and emergent system behavior
Property testsInvariants hold for all generated inputsThat real-world inputs exercise those invariants
Fuzz testsParser doesn't crash on malformed inputSemantic correctness of valid input handling
E2E dogfoodingThe full stack works under realistic conditionsThat your assertions actually detect regressions
Mutation testsYour assertions detect real code changesProtocol-level emergent behavior

The a2a-rust experience: After building 1,200+ unit/integration/property/fuzz tests, an exhaustive 72-test E2E dogfood suite that caught 36 real bugs across 8 passes, and achieving full green CI — mutation testing still found gaps. Tests that looked comprehensive were silently missing assertions on return values, boundary conditions, and delegation correctness. The suite was green, but mutants survived because no test verified the specific behavior being mutated.

This is why mutation testing is a required quality gate: it is the only technique that measures test effectiveness rather than test existence. Every other technique answers "does the code work?" — mutation testing answers "would the tests catch it if the code broke?"

Mutation Testing

Mutation testing is the final, critical layer of test quality assurance. While unit tests verify correctness and fuzz tests find edge cases, mutation testing answers a fundamentally different question: do your tests actually detect real bugs?

A mutant is a small, deliberate code change — replacing + with -, flipping true to false, returning a default value instead of a computed one. If the test suite still passes after a mutation, there is a gap: a real bug in that exact location would go undetected.

Why This Matters at Scale

At multi-data-center deployment scales, the bugs that slip through traditional testing are precisely the kind that mutation testing catches:

  • Off-by-one errors in pagination, retry logic, and timeout calculations
  • Swapped operands in status comparisons (e.g., == vs != on task state)
  • Missing boundary checks where a default return looks plausible
  • Dead code paths where a branch is never exercised by any test

These are the subtle, semantic correctness issues that only manifest under load, across network partitions, or during multi-hop agent orchestration — exactly the conditions that are hardest to reproduce in staging.

What Mutation Testing Found in a2a-rust

Even with 1,255 passing tests, 72 E2E dogfood tests, property tests, and fuzz targets — all green — the first mutation testing run surfaced gaps across every crate:

  • Delegation methods returning () instead of forwarding calls (e.g., Arc<T> metrics delegation, OTel instrument recording)
  • Hash function correctness — replacing ^= with |= in FNV-1a was undetected because no test verified specific hash values
  • Date arithmetic — swapping / with % or * in HTTP date formatting was undetected because the only test used epoch (where all fields are 0)
  • Rate limiter logic — replacing > with >= in window checks, && with || in cleanup conditions, and / with % in window calculations
  • Builder patternsbuilder() returning Default::default() instead of a functional builder was undetected because existing tests used the built result without verifying builder-specific behavior
  • Debug formattingfmt returning Ok(Default::default()) (empty string) instead of the real debug output
  • Cancellation tokenis_cancelled() returning a hardcoded true or false instead of delegating to the actual token

Every one of these mutations represents a real bug that could have been introduced without any test catching it. The fix in each case was straightforward: add a test that asserts the specific behavior.

Running Mutation Tests

# Install cargo-mutants (one-time setup)
cargo install cargo-mutants

# Full mutation sweep (all library crates)
cargo mutants --workspace

# Test a specific crate
cargo mutants -p a2a-protocol-types

# Test a specific file
cargo mutants --file crates/a2a-types/src/task.rs

# Dry-run: list all mutants without running tests
cargo mutants --list --workspace

Configuration

Mutation testing is configured via mutants.toml at the workspace root:

# Which files to mutate
examine_globs = [
    "crates/a2a-types/src/**/*.rs",
    "crates/a2a-client/src/**/*.rs",
    "crates/a2a-server/src/**/*.rs",
    "crates/a2a-sdk/src/**/*.rs",
]

# Skip unproductive mutations (re-exports, generated code, formatting)
exclude_globs = ["**/mod.rs", "crates/*/src/proto/**"]
exclude_re = ["fmt$", "^tracing::", "^log::"]

CI Integration

  • Nightly: A full mutation sweep runs every night. Any surviving mutant fails the build and is reported as a CI artifact.
  • PR gate: An incremental sweep runs on changed files only, so PR feedback is fast while still enforcing zero surviving mutants on new/modified code.

Interpreting Results

Found 247 mutants to test
 247 caught   ✓     # Test suite detected the mutation
   0 missed   ✗     # ALERT: test gap — add or strengthen tests
   3 unviable ⊘     # Mutation caused compile error (not a gap)
  • Caught: The test suite correctly detected the mutation. Good.
  • Missed: A real bug in this location would go undetected. Add tests.
  • Unviable: The mutation produced a compile error. Not a test gap.

Target: 100% mutation score (zero missed mutants across all library crates).

Fixing Surviving Mutants

When a mutant survives, cargo mutants prints the exact source location and mutation. For example:

MISSED: crates/a2a-types/src/task.rs:42: replace TaskState::is_terminal -> bool with false

This tells you that replacing the body of is_terminal() with false did not cause any test to fail. The fix is to add a test that asserts is_terminal() returns true for terminal states.

Running the Test Suite

Current status: The workspace has 1,255 passing tests across all crates (unit, integration, property, fuzz, and E2E dogfood).

# All tests
cargo test --workspace

# Specific crate
cargo test -p a2a-protocol-server

# With output
cargo test --workspace -- --nocapture

# Specific test
cargo test test_calculator_executor

Next Steps

Dogfooding: The Agent Team Example

The best way to find bugs in an SDK is to use it yourself — under real conditions, with real complexity, exercising real interaction patterns. Unit tests verify individual functions. Integration tests verify pairwise contracts. But only dogfooding reveals the emergent issues that appear when all the pieces come together.

The agent-team example (examples/agent-team/) is a full-stack dogfood of every a2a-rust capability. It deploys 4 specialized agents that discover each other, delegate work, stream results, and report health — all via the A2A protocol. A comprehensive test suite of 72 E2E tests (79 with optional transports and signing) runs in ~2.5 seconds.

Why Dogfood?

Unit tests and integration tests are necessary but insufficient. No single test type is enough — it requires an ensemble of all types to catch everything. Here's what each layer catches and misses:

Testing levelWhat it catchesWhat it misses
Unit testsLogic errors in isolated functionsInteraction bugs, serialization mismatches
Integration testsPairwise contracts between componentsMulti-hop communication, emergent behavior
Property testsEdge cases in data handlingProtocol flow issues, lifecycle bugs
Fuzz testsMalformed input handlingSemantic correctness of valid flows
DogfoodingDX issues, multi-hop bugs, performance surprises, missing featuresWeak assertions, dead code paths
Mutation testsWeak/missing assertions, dead code paths, off-by-one errors, swapped operandsProtocol-level emergent behavior

The critical lesson: After building 1,255 tests across all of the above categories — unit, integration, property, fuzz, and 72 E2E dogfood tests that caught 36 real bugs — the entire suite was green. Every CI check passed. Then we ran mutation testing, and it found gaps in every crate. Tests that looked comprehensive were silently missing assertions on return values, boundary conditions, delegation correctness, and hash function specifics.

Mutation testing fills the gap between "tests pass" and "tests actually detect bugs." A test suite with 100% line coverage can still have a 0% mutation score if every assertion is trivial. Mutation testing is the only technique that directly measures test effectiveness rather than test existence. See Testing Your Agent — Mutation Testing for setup and usage.

Dogfooding operates at the highest level of the testing pyramid. It catches the class of bugs that live in the seams between components — bugs that only manifest when a real application exercises the full stack in realistic patterns. But even dogfooding cannot verify that your assertions are strong enough to catch regressions — that's where mutation testing completes the picture.

The Agent Team Architecture

┌─────────────────────────────────────────────────────────────┐
│                     E2E Test Harness                        │
│              (72 tests, ~2500ms total)                      │
└─────┬───────────┬───────────┬───────────┬───────────────────┘
      │           │           │           │
      ▼           ▼           ▼           ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│  Code    │ │  Build   │ │  Health  │ │ Coordin- │
│ Analyzer │ │ Monitor  │ │ Monitor  │ │   ator   │
│ JSON-RPC │ │  REST    │ │ JSON-RPC │ │  REST    │
└──────────┘ └──────────┘ └──────────┘ └─────┬────┘
                                             │ A2A
                          ┌──────────────────┼──────────────┐
                          │                  │              │
                          ▼                  ▼              ▼
                     CodeAnalyzer      BuildMonitor   HealthMonitor
                     (send_message)    (send_message) (send_message)
                                                           │
                                                      ┌────┴────┐
                                                      ▼         ▼
                                                 list_tasks  list_tasks
                                                (all agents) (all agents)

Each agent exercises different SDK capabilities:

AgentTransportCapabilities Exercised
CodeAnalyzerJSON-RPCStreaming artifacts with append mode, multi-part output (text + JSON data), cancellation token checking
BuildMonitorRESTFull task lifecycle (Completed/Failed/Canceled), streaming phase output, cancel executor override, push notification support
HealthMonitorJSON-RPCMulti-part input (text + data), agent-to-agent discovery via list_tasks, push notification support
CoordinatorRESTA2A client calls to other agents, result aggregation, multi-level orchestration

SDK Features Exercised

The agent team exercises 35+ distinct SDK features in a single run:

  • AgentExecutor trait (4 implementations)
  • RequestHandlerBuilder (all options: timeout, queue capacity, max streams, metrics, interceptors)
  • JsonRpcDispatcher and RestDispatcher
  • WebSocketDispatcher (with websocket feature flag)
  • ClientBuilder (both JSON-RPC and REST protocol bindings)
  • Synchronous SendMessage and streaming SendStreamingMessage
  • EventStream consumer (SSE event loop)
  • GetTask and ListTasks with pagination and status filtering
  • CancelTask with custom executor override
  • Push notification config CRUD (set_push_config, list_push_configs)
  • HttpPushSender delivery with webhook receiver
  • ServerInterceptor (audit logging + bearer token auth checking)
  • Custom Metrics observer (request/response/error counting)
  • AgentCard discovery (both transports)
  • Multi-part messages (Part::text + Part::data)
  • Artifact streaming with append and last_chunk flags
  • All TaskState transitions (Submitted, Working, Completed, Failed, Canceled)
  • CancellationToken cooperative checking
  • return_immediately mode
  • Request metadata passthrough
  • Context ID continuation across messages
  • Concurrent GetTask during active streams
  • SubscribeToTask resubscribe (both REST and JSON-RPC)
  • boxed_future and EventEmitter helpers
  • Concurrent streams on same agent
  • history_length configuration
  • Part::file_bytes (binary file content)
  • TenantAwareInMemoryTaskStore isolation
  • TenantContext::scope task-local threading
  • WebSocket transport (SendMessage + streaming) — with websocket feature
  • Batch JSON-RPC (single, multi, empty, mixed, streaming rejection)
  • Real auth rejection (interceptor short-circuit)
  • GetExtendedAgentCard via JSON-RPC
  • DynamicAgentCardHandler (runtime-generated cards)
  • Agent card HTTP caching (ETag + 304 Not Modified)
  • Backpressure / lagged event queue (capacity=2)

Modular Example Structure

examples/agent-team/src/
├── main.rs                      # Thin orchestrator (~400 lines)
├── executors/
│   ├── mod.rs                   # Re-exports
│   ├── code_analyzer.rs         # CodeAnalyzer executor
│   ├── build_monitor.rs         # BuildMonitor executor
│   ├── health_monitor.rs        # HealthMonitor executor
│   └── coordinator.rs           # Coordinator executor (A2A client calls)
├── cards.rs                     # Agent card builders
├── helpers.rs                   # Shared helpers (make_send_params, EventEmitter)
├── infrastructure.rs            # Metrics, interceptors, webhook, server setup
└── tests/
    ├── mod.rs                   # TestResult, TestContext
    ├── basic.rs                 # Tests 1-10: core send/stream paths
    ├── lifecycle.rs             # Tests 11-20: orchestration, cancel, agent cards
    ├── edge_cases.rs            # Tests 21-30: errors, concurrency, metrics
    ├── stress.rs                # Tests 31-40: stress, durability, event ordering
    ├── dogfood.rs               # Tests 41-50: SDK gaps, regressions, edge cases
    ├── transport.rs             # Tests 51-58: WebSocket, gRPC, multi-tenancy
    └── coverage_gaps.rs         # Tests 61-79: Batch JSON-RPC, auth, cards, caching, backpressure, signing

Running the Agent Team

# Basic run (all output to stdout)
cargo run -p agent-team

# With WebSocket tests (tests 51-52)
cargo run -p agent-team --features websocket

# With structured logging
RUST_LOG=debug cargo run -p agent-team --features tracing

# With all optional features
cargo run -p agent-team --features "websocket,tracing"

Expected output:

╔══════════════════════════════════════════════════════════════╗
║     A2A Agent Team — Full SDK Dogfood & E2E Test Suite     ║
╚══════════════════════════════════════════════════════════════╝

Agent [CodeAnalyzer]  JSON-RPC on http://127.0.0.1:XXXXX
Agent [BuildMonitor]  REST     on http://127.0.0.1:XXXXX
Agent [HealthMonitor] JSON-RPC on http://127.0.0.1:XXXXX
Agent [Coordinator]   REST     on http://127.0.0.1:XXXXX

...72 tests...

║ Total: 72 | Passed: 72 | Failed: 0 | Time: ~2500ms

Lessons for Your Own Agents

  1. Test all four transports. JSON-RPC, REST, WebSocket, and gRPC have different serialization and framing paths. A bug in one may not exist in the others.
  2. Test multi-hop flows. Agent A calling Agent B is different from a client calling Agent A. The interaction patterns surface different bugs.
  3. Test failure paths explicitly. The agent team tests TaskState::Failed and TaskState::Canceled alongside Completed. Happy-path-only testing misses lifecycle bugs.
  4. Use real metrics and interceptors. They exercise code paths that exist in the handler but are invisible to pure request/response tests.
  5. Deploy multiple agents simultaneously. Concurrent servers with different configurations stress connection pooling, port binding, and resource cleanup in ways single-server tests cannot.
  6. Test return_immediately mode. Client config must actually propagate to the server — this was a real bug caught only by dogfooding.
  7. Test tenant isolation. Multi-tenancy bugs are subtle — same task IDs in different tenants should not collide.

Open Issues & Future Work — All Resolved

All architecture, ergonomics, observability, performance, and durability issues from passes 1–8 have been resolved (36 bugs across 8 passes). All proposed beyond-spec features have been implemented:

FeatureLocation
OpenTelemetry integrationcrates/a2a-server/src/otel/OtelMetrics, OtelMetricsBuilder, init_otlp_pipeline (otel feature)
Connection pooling metricscrates/a2a-server/src/metrics.rsConnectionPoolStats, on_connection_pool_stats
Hot-reload agent cardscrates/a2a-server/src/agent_card/hot_reload.rs — file polling + SIGHUP reload
Store migration toolingcrates/a2a-server/src/store/migration.rsMigrationRunner, V1–V3 migrations
Per-tenant configurationcrates/a2a-server/src/tenant_config.rsPerTenantConfig, TenantLimits
TenantResolver traitcrates/a2a-server/src/tenant_resolver.rs — header, bearer token, path segment strategies
Agent card signing E2Eexamples/agent-team/src/tests/coverage_gaps.rs — test 79 (signing feature)

Sub-pages

See Also

Dogfooding: Bugs Found & Fixed

Eight dogfooding passes across v0.1.0 and v0.2.0 uncovered 36 real bugs that 1,255 unit tests, integration tests, property tests, and fuzz tests did not catch. All 36 have been fixed.

Summary

PassFocusBugsSeverity
Pass 1Initial dogfood32 Medium, 1 Low
Pass 2Hardening audit61 High, 2 Medium, 3 Low
Pass 3Stress testing11 High
Pass 4SDK regressions32 Critical, 1 Medium
Pass 5Concurrency42 High, 1 Medium, 1 Low
Pass 6Architecture51 Critical, 1 High, 3 Medium
Pass 7Deep dogfood91 Critical, 2 High, 4 Medium, 2 Low
Pass 8Performance & security51 Critical, 2 Medium, 1 Medium, 1 Low

By Severity

SeverityCountExamples
Critical5Timeout retry broken (#32), push config DoS (#26), placeholder URLs (#11, #12, #18)
High6Concurrent SSE (#9), return_immediately ignored (#10), TOCTOU race (#15), SSRF bypass (#25)
Medium16REST field stripping (#1), query encoding (#19), path traversal (#35)
Low9Metrics hooks (#2, #6, #7), gRPC error context (#36)

Configuration Hardening

Extracted all hardcoded constants into configurable structs during passes 2-7:

StructFieldsWhere
DispatchConfigmax_request_body_size, body_read_timeout, max_query_string_lengthBoth dispatchers
PushRetryPolicymax_attempts, backoffHttpPushSender
HandlerLimitsmax_id_length, max_metadata_size, max_cancellation_tokens, max_token_ageRequestHandler

Pass 1: Initial Dogfood (3 bugs)

Bug 1: REST Transport Strips Required Fields from Push Config Body

Severity: Medium | Component: Client REST transport + Server dispatch

The client's REST transport extracts path parameters from the serialized JSON body to interpolate URL templates. For CreateTaskPushNotificationConfig, the route is /tasks/{taskId}/pushNotificationConfigs, so the transport extracts taskId from the body and removes it. But the server handler requires taskId in the body.

Why tests missed it: Unit tests test client transport and server dispatch in isolation. The bug only appears when they interact.

Fix: Server-side handle_set_push_config injects taskId from the URL path back into the body before parsing.

Bug 2: on_response Metrics Hook Never Called

Severity: Low | Component: RequestHandler

Metrics::on_response showed 0 responses after 17 successful requests. The hook was defined but never called in any handler method.

Fix: Added on_request()/on_response() calls to all 10 handler methods.

Bug 3: Protocol Binding Mismatch Produces Confusing Errors

Severity: Low | Component: Client JSON-RPC transport

When a JSON-RPC client called a REST-only server, the error was an opaque parsing failure rather than "wrong protocol binding."

Fix: (1) New ClientError::ProtocolBindingMismatch variant, (2) JSON-RPC transport detects non-JSON-RPC responses, (3) HealthMonitor uses agent card discovery to select correct transport.


Pass 2: Hardening Audit (6 bugs)

Bug 4: list_push_configs REST Response Format Mismatch

Severity: Medium | Component: Both dispatchers

Dispatchers serialized results as raw Vec<TaskPushNotificationConfig>, but the client expected ListPushConfigsResponse { configs, next_page_token }.

Fix: Both dispatchers wrap results in ListPushConfigsResponse.

Bug 5: Push Notification Test Task ID Mismatch

Severity: Medium | Component: Test design

Push config was registered on Task A, but a subsequent send_message created Task B. No config existed for Task B.

Fix: Restructured as push config CRUD lifecycle test.

Bug 6: on_error Metrics Hook Never Fired

Severity: Low | Component: RequestHandler

All handler error paths used ? to propagate without invoking on_error.

Fix: All 10 handler methods restructured with async block + match on on_response/on_error.

Bug 7: on_queue_depth_change Metrics Hook Never Fired

Severity: Low | Component: EventQueueManager

EventQueueManager had no access to the Metrics object.

Fix: Added Arc<dyn Metrics> to EventQueueManager, wired from builder.

Bug 8: JsonRpcDispatcher Does Not Serve Agent Cards

Severity: Medium | Component: JsonRpcDispatcher

resolve_agent_card() failed for JSON-RPC agents because only RestDispatcher served /.well-known/agent.json.

Fix: Added StaticAgentCardHandler to JsonRpcDispatcher.

Bug 9: SubscribeToTask Fails with Concurrent SSE Streams

Severity: High | Component: EventQueueManager

mpsc channels allow only a single reader. Once stream_message took the reader, subscribe_to_task failed with "no active event queue."

Fix: Redesigned from mpsc to tokio::sync::broadcast channels. subscribe() creates additional readers from the same sender.


Pass 3: Stress Testing (1 bug)

Bug 10: Client Ignores return_immediately Config

Severity: High | Component: Client send_message()

ClientBuilder::with_return_immediately(true) stored the flag in ClientConfig but send_message() never injected it into MessageSendParams.configuration. The server never saw the flag, so tasks always ran to completion.

Why tests missed it: The server-side return_immediately logic was correct. The bug was in client-to-server config propagation — a seam that only E2E testing exercises.

Fix: Added apply_client_config() that merges client-level return_immediately, history_length, and accepted_output_modes into params before sending. Per-request values take precedence over client defaults.


Pass 4: SDK Regression Testing (3 bugs)

Bug 11: JSON-RPC ListTaskPushNotificationConfigs Param Type Mismatch

Severity: Critical | Component: JsonRpcDispatcher

The JSON-RPC dispatcher parsed ListTaskPushNotificationConfigs params as TaskIdParams (field id), but the client sends ListPushConfigsParams (field task_id). This caused silent deserialization failure — push config listing via JSON-RPC was completely broken. REST worked because it uses path-based routing.

Why tests missed it: Previous push config tests used REST transport or tested create/get/delete but not list. The JSON-RPC list path was never exercised end-to-end.

Fix: Changed parse_params::<TaskIdParams> to parse_params::<ListPushConfigsParams> and p.id to p.task_id in jsonrpc.rs.

Bug 12: Agent Card URLs Set to "http://placeholder"

Severity: Critical | Component: Agent-team example

Agent cards were constructed with code_analyzer_card("http://placeholder") before the server bound to a port. The actual address was only known after TcpListener::bind(). This meant /.well-known/agent.json served a card with a URL that didn't match the actual server address.

Why tests missed it: Tests used URLs from TestContext (the real bound addresses), not from the agent card. Only resolve_agent_card() tests would have caught this, and those didn't exist.

Fix: Introduced bind_listener() that pre-binds TCP listeners to get addresses before handler construction. Cards are now built with correct URLs.

Bug 13: Push Notification Event Classification Broken

Severity: Medium | Component: Agent-team webhook receiver

The webhook receiver classified events by checking value.get("status") and value.get("artifact"), but StreamResponse serializes as {"statusUpdate": {...}} / {"artifactUpdate": {...}} (camelCase variant names). All push events were classified as "Unknown".

Fix: Check statusUpdate/artifactUpdate/task instead.


Pass 5: Hardening & Concurrency Audit (4 bugs)

Bug 14: Lock Poisoning Silently Masked in InMemoryCredentialsStore

Severity: High | Component: InMemoryCredentialsStore

All three CredentialsStore methods (get, set, remove) used .ok()? or if let Ok(...) to silently ignore RwLock poisoning. If a thread panicked while holding the lock, subsequent calls would return None (for get) or silently skip the operation (for set/remove), masking the underlying bug.

Why tests missed it: Tests don't exercise lock poisoning because #[test] functions that panic abort the test, not the lock.

Fix: Changed all three methods to .expect("credentials store lock poisoned") for fail-fast behavior. Added documentation explaining the poisoning semantics.

Bug 15: Rate Limiter TOCTOU Race on Window Advance

Severity: High | Component: RateLimitInterceptor

When two concurrent requests arrived at a window boundary, both could see the old window_start, and both would store count = 1 for the new window. This let 2N requests through per window instead of N.

The race: Thread A loads window_start (old), Thread B loads window_start (old), Thread A stores new window_start + count=1, Thread B stores new window_start + count=1 (clobbering A's count).

Why tests missed it: The single-threaded test executor doesn't interleave atomic operations. The race only manifests under real concurrent load.

Fix: Replaced the non-atomic load-check-store sequence with a compare_exchange (CAS) loop. Only one thread wins the CAS to reset the window; others retry and see the updated window on the next iteration.

Bug 16: Rate Limiter Unbounded Bucket Growth

Severity: Medium | Component: RateLimitInterceptor

The buckets HashMap grew without bound. Each unique caller key created a CallerBucket that was never removed, even after the caller departed. In a service with many transient callers (e.g., serverless functions), this would leak memory indefinitely.

Why tests missed it: Tests use a small fixed set of callers. The leak only manifests with high caller churn over time.

Fix: Added amortized stale-bucket cleanup (every 256 check() calls). Buckets whose window_start is more than one window old are evicted.

Bug 17: No Protocol Version Compatibility Warning

Severity: Low | Component: ClientBuilder

ClientBuilder::from_card() silently accepted any protocol_version from the agent card, including incompatible versions (e.g., "2.0.0" when the client supports "1.x"). Users would only discover the mismatch through obscure deserialization errors.

Fix: Added protocol version major-version check in from_card(). When the agent's major version differs from the client's supported version, a tracing::warn! is emitted.


Pass 6: Architecture Audit (5 bugs)

Bug 18: gRPC Agent Card Still Uses Placeholder URL

Severity: Critical | Component: Agent-team example (gRPC path)

The gRPC CodeAnalyzer agent was still constructed with grpc_analyzer_card("http://placeholder") — the exact same Bug #12 pattern that was fixed for HTTP agents in Pass 4. The gRPC path was missed because it was behind a #[cfg(feature = "grpc")] gate.

Why tests missed it: gRPC tests used the address from serve_grpc() return value, not from the agent card. Agent card discovery tests only ran for HTTP agents.

Fix: Added GrpcDispatcher::serve_with_listener() to accept a pre-bound TcpListener. The agent-team example now pre-binds for gRPC the same way it does for HTTP, ensuring the agent card URL is correct from the start.

Bug 19: REST Transport Query Strings Not URL-Encoded

Severity: High | Component: Client REST transport

build_query_string() concatenated parameter values directly into query strings without percent-encoding. Values containing &, =, spaces, or other special characters would corrupt the query string, causing server-side parsing failures or parameter injection.

Why tests missed it: All test parameters used simple alphanumeric values that don't need encoding.

Fix: Added encode_query_value() implementing RFC 3986 percent-encoding for all non-unreserved characters. Both keys and values are now encoded.

Bug 20: WebSocket Stream Termination Uses Fragile String Matching

Severity: Medium | Component: Client WebSocket transport

The WebSocket stream reader detected stream completion by checking text.contains("stream_complete"). This would false-positive on any payload containing that substring (e.g., a task whose output mentions "stream_complete") and would miss terminal status updates that don't contain that exact string.

Fix: Replaced with is_stream_terminal() that deserializes the JSON-RPC frame and checks for terminal task states (completed, failed, canceled, rejected) or the stream_complete sentinel in the result object.

Bug 21: Background Event Processor Silently Drops Store Write Failures

Severity: High | Component: Server RequestHandler background processor

In streaming mode, the background event processor called let _ = task_store.save(...).await; at 5 call sites, silently discarding any store errors. If the store failed (disk full, permission denied), task state would diverge: the event queue showed completion but the persistent store didn't record it.

Why tests missed it: In-memory stores don't fail. The bug only manifests with persistent backends under storage pressure.

Fix: All 5 sites now use if let Err(_e) = task_store.save(...).await { trace_error!(...) } to surface failures via structured logging.

Bug 22: Metadata Size Validation Bypass via Serialization Failure

Severity: Medium | Component: Server RequestHandler messaging

Metadata size was measured with serde_json::to_string(meta).map(|s| s.len()).unwrap_or(0). If serialization failed, the size defaulted to 0, bypassing the size limit entirely.

Fix: Changed unwrap_or(0) to map_err(|_| ServerError::InvalidParams("metadata is not serializable")), rejecting unserializable metadata outright.


Pass 7: Deep Dogfood (9 bugs)

Bug 23: Graceful Shutdown Hangs on Executor Cleanup

Severity: High | Component: RequestHandler

shutdown() called executor.on_shutdown().await with no timeout. If an executor's cleanup routine blocked indefinitely, the entire shutdown process would hang.

Fix: Both shutdown() and shutdown_with_timeout() now bound the executor cleanup call with a timeout (10 seconds for shutdown(), the provided timeout for shutdown_with_timeout()).

Bug 24: Push Notification Body Clone Per Retry Attempt

Severity: Medium | Component: HttpPushSender

body_bytes.clone() inside the retry loop allocated a full copy of the serialized event body for every retry attempt. For large events with 3 retries, this caused 3 unnecessary heap allocations.

Fix: Changed body_bytes from Vec<u8> to Bytes (reference-counted). Clone is now an atomic reference count increment instead of a memory copy.

Bug 25: Webhook URL Missing Scheme Validation

Severity: High | Component: HttpPushSender

validate_webhook_url() checked for private IPs and hostnames but did not validate the URL scheme. URLs like ftp://evil.com/hook or file:///etc/passwd bypassed all SSRF validation.

Fix: Added explicit scheme check requiring http:// or https://. Unknown schemes and schemeless URLs are now rejected with a descriptive error.

Bug 26: Push Config Store Unbounded Global Growth (DoS)

Severity: Critical | Component: InMemoryPushConfigStore

The per-task config limit (100) prevented excessive configs per task, but there was no global limit. An attacker could create millions of tasks with 100 configs each, exhausting memory.

Fix: Added max_total_configs field (default 100,000) with with_max_total_configs() builder. The global check runs before the per-task check in set().

Bug 27: gRPC Error Code Mapping Incomplete

Severity: Medium | Component: Client gRPC transport

Only 4 tonic status codes were mapped to A2A error codes. Unauthenticated, PermissionDenied, ResourceExhausted, DeadlineExceeded, Cancelled, and Unavailable all silently mapped to InternalError, losing semantic information.

Fix: Added explicit mappings for 6 additional gRPC status codes.

Bug 28: BuildMonitor Cancel Race Condition

Severity: Medium | Component: Agent-team example

BuildMonitorExecutor::cancel() unconditionally emitted TaskState::Canceled without checking if the task had already reached a terminal state. If the executor completed between the handler checking and the cancel arriving, this caused an invalid Completed → Canceled transition.

Fix: Added ctx.cancellation_token.is_cancelled() guard before emitting cancel status.

Bug 29: CodeAnalyzer Missing Cancellation Re-Check

Severity: Low | Component: Agent-team example

CodeAnalyzerExecutor only checked cancellation once (before artifact emission). Multiple artifact emissions happened without re-checking, meaning cancellation between artifacts was delayed.

Fix: Added cancellation re-check between the two artifact emission phases.

Bug 30: Accept Loop Breaks Kill Servers

Severity: Medium | Component: Agent-team infrastructure

All three server startup functions (serve_jsonrpc, serve_rest, start_webhook_server) used Err(_) => break in their accept loops. A single transient accept error (e.g., EMFILE) would permanently kill the server.

Fix: Changed to Err(_) => continue for JSON-RPC and REST servers, and eprintln! + continue for the webhook receiver.

Bug 31: Coordinator Silent Client Build Failure

Severity: Low | Component: Agent-team example

When ClientBuilder::build() failed for an agent URL, the error was silently discarded (if let Ok(client) = ...). If all agents failed, the coordinator would run with an empty client map, producing confusing "Unknown command" errors.

Fix: Changed to match with Err(e) => eprintln!(...) to log the failing agent name, URL, and error.


Pass 8: Deep Dogfood (5 bugs)

Bug 32: Timeout Errors Misclassified as Transport (CRITICAL)

Severity: Critical | Component: Client REST + JSON-RPC transports

Both RestTransport::execute_request() and JsonRpcTransport::execute_request() mapped tokio::time::timeout errors to ClientError::Transport. Since Transport is explicitly marked non-retryable in is_retryable(), timeouts never triggered retry logic — defeating the entire retry system for the most common transient failure mode.

Why tests missed it: Unit tests checked that timeouts produced errors, but never checked the error variant. The retry integration tests used simulated errors, not real timeouts.

Fix: Changed both transports to use ClientError::Timeout("request timed out"). Added exhaustive retryability classification tests.

Bug 33: SSE Parser O(n) Dequeue Performance

Severity: Medium | Component: Client SSE parser

SseParser::next_frame() used Vec::remove(0) which is O(n) because it shifts all remaining elements. With high-throughput streaming (hundreds of events per second), this creates quadratic overhead.

Why tests missed it: Unit tests parse small event counts (1-3 events). The performance issue only manifests with large event queues.

Fix: Replaced Vec<Result<SseFrame, SseParseError>> with VecDeque for O(1) pop_front().

Bug 34: SSE Parser Silent UTF-8 Data Loss

Severity: Medium | Component: Client SSE parser

Malformed UTF-8 lines were silently discarded (return on from_utf8 failure). When a multi-byte UTF-8 character spans a TCP chunk boundary, the trailing bytes can appear invalid. The entire line would be dropped, causing silent data loss.

Why tests missed it: All test inputs use ASCII. The bug only manifests with non-ASCII content delivered across TCP fragment boundaries.

Fix: Changed to String::from_utf8_lossy() which replaces invalid bytes with U+FFFD instead of dropping the entire line.

Bug 35: Double-Encoded Path Traversal Bypass

Severity: Medium | Component: Server REST dispatcher

contains_path_traversal() only decoded one level of percent-encoding. An attacker could use %252E%252E (which decodes to %2E%2E, then to ..) to bypass the check.

Why tests missed it: No test used double-encoded inputs. The existing test only checked raw .. sequences.

Fix: Added a second decoding pass. Added tests for raw, single-encoded, and double-encoded path traversal.

Bug 36: gRPC Stream Errors Lose Error Context

Severity: Low | Component: Client gRPC transport

grpc_stream_reader_task mapped gRPC stream errors to generic ClientError::Transport(format!("gRPC stream error: {}", status.message())) instead of using the existing grpc_code_to_error_code() function. This lost the structured error code information (NotFound, InvalidArgument, etc.).

Why tests missed it: gRPC streaming tests check for successful completion, not error paths within the stream.

Fix: Changed to use ClientError::Protocol(A2aError::new(grpc_code_to_error_code(...), ...)) for proper error classification.

Dogfooding: Test Coverage Matrix

The agent team runs 72 E2E tests across 8 test modules (79 with optional transports and signing). All tests pass in ~2.5 seconds.

Tests 1-10: Core Paths (basic.rs)

#TestTransportWhat it exercises
1sync-jsonrpc-sendJSON-RPCSynchronous SendMessage, artifact count
2streaming-jsonrpcJSON-RPCSSE streaming, event ordering, artifact metadata
3sync-rest-sendRESTREST synchronous path
4streaming-restRESTREST SSE streaming
5build-failure-pathRESTTaskState::Failed lifecycle
6get-taskJSON-RPCGetTask retrieval by ID
7list-tasksJSON-RPCListTasks with pagination token
8push-config-crudRESTPush config create/get/list/delete/verify lifecycle
9multi-part-messageJSON-RPCText + data parts, HealthMonitor agent-to-agent
10agent-to-agentRESTCoordinator delegates to CodeAnalyzer

Tests 11-20: Lifecycle (lifecycle.rs)

#TestTransportWhat it exercises
11full-orchestrationRESTCoordinator -> CodeAnalyzer + BuildMonitor fan-out
12health-orchestrationRESTCoordinator -> HealthMonitor -> all agents (3-level)
13message-metadataJSON-RPCRequest metadata passthrough
14cancel-taskRESTMid-stream cancellation via CancelTask
15get-nonexistent-taskJSON-RPCError: TaskNotFound (-32001)
16pagination-walkJSON-RPCListTasks page_size=1, no duplicates
17agent-card-discoveryRESTresolve_agent_card on REST endpoint
18agent-card-jsonrpcJSON-RPCresolve_agent_card on JSON-RPC endpoint
19push-not-supportedJSON-RPCError: NotSupported (-32003) for no-push agent
20cancel-completedRESTError: InvalidState for completed task cancel

Tests 21-30: Edge Cases (edge_cases.rs)

#TestTransportWhat it exercises
21cancel-nonexistentRESTError: TaskNotFound for fake task ID
22return-immediatelyRESTreturn_immediately client config propagation
23concurrent-requestsJSON-RPC5 parallel requests to same agent
24empty-parts-rejectedJSON-RPCValidation: empty message parts
25get-task-restRESTGetTask via REST transport
26list-tasks-restRESTListTasks via REST transport
27push-crud-jsonrpcJSON-RPCPush config CRUD via JSON-RPC (HealthMonitor)
28resubscribe-restRESTSubscribeToTask concurrent resubscription
29metrics-nonzeroAll 4 agents have non-zero request counts
30error-metrics-trackedJSON-RPCError metric increments on invalid request

Tests 31-40: Stress & Durability (stress.rs)

#TestTransportWhat it exercises
31high-concurrencyJSON-RPC20 parallel requests to same agent
32mixed-transportBothREST + JSON-RPC simultaneously
33context-continuationJSON-RPCTwo messages with same context_id
34large-payloadJSON-RPC64KB text payload processing
35stream-with-get-taskRESTGetTask during active SSE stream
36push-delivery-e2eRESTPush config set during streaming task
37list-status-filterJSON-RPCListTasks with TaskState::Completed filter
38store-durabilityJSON-RPCCreate 5 tasks, retrieve all 5
39queue-depth-metricsCumulative metrics tracking (>20 total requests)
40event-orderingJSON-RPCWorking -> artifacts -> Completed sequence

Tests 41-50: SDK Dogfood Regressions (dogfood.rs)

#TestTransportWhat it exercises
41card-url-correctJSON-RPCAgent card URL matches actual bound address
42card-skills-validBothAll 4 agent cards have name, description, version, skills, interface
43push-list-regressionJSON-RPCListTaskPushNotificationConfigs via JSON-RPC (regression for bug 11)
44push-event-classifyRESTWebhook event classifier uses correct field names
45resubscribe-jsonrpcJSON-RPCSubscribeToTask via JSON-RPC transport
46multiple-artifactsJSON-RPCMultiple artifacts per task (>=2)
47concurrent-streamsREST5 parallel SSE streams on same agent
48list-context-filterJSON-RPCListTasks with context_id filter
49file-partsJSON-RPCPart::file_bytes with base64 content
50history-lengthJSON-RPChistory_length configuration via builder

Tests 51-58: WebSocket, Multi-Tenancy & gRPC (transport.rs)

#TestTransportWhat it exercises
51ws-send-messageWebSocketJSON-RPC SendMessage over WebSocket (feature-gated)
52ws-streamingWebSocketSendStreamingMessage with multi-frame streaming (feature-gated)
53tenant-isolationJSON-RPCDifferent tenants cannot see each other's tasks
54tenant-id-independenceDirect storeSame task ID in different tenants doesn't collide
55tenant-countDirect storeTenantAwareInMemoryTaskStore::tenant_count() tracking
56grpc-send-messagegRPCJSON-RPC SendMessage over gRPC transport (feature-gated)
57grpc-streaminggRPCSendStreamingMessage over gRPC transport (feature-gated)
58grpc-get-taskgRPCGetTask after SendMessage over gRPC (feature-gated)

Note: Tests 51-52 require the websocket feature flag: cargo run -p agent-team --features websocket Tests 56-58 require the grpc feature flag: cargo run -p agent-team --features grpc

Tests 61-79: E2E Coverage Gaps (coverage_gaps.rs)

#TestCategoryWhat it exercises
61batch-single-elementBatch JSON-RPCSingle-element batch [{...}] with SendMessage
62batch-multi-requestBatch JSON-RPCMulti-request batch: SendMessage + GetTask
63batch-emptyBatch JSON-RPCEmpty batch [] returns parse error
64batch-mixedBatch JSON-RPCMixed valid/invalid requests in batch
65batch-streaming-rejectedBatch JSON-RPCSendStreamingMessage in batch returns error
66batch-subscribe-rejectedBatch JSON-RPCSubscribeToTask in batch returns error
67real-auth-rejectionAuthInterceptor rejects unauthenticated requests
68extended-agent-cardCardsGetExtendedAgentCard via JSON-RPC
69dynamic-agent-cardCardsDynamicAgentCardHandler runtime card generation
70agent-card-cachingCachingETag, If-None-Match, 304 Not Modified
71backpressure-laggedStreamingSlow reader skips lagged events (capacity=2)
72push-global-limitPush configGlobal push config limit enforcement (DoS prevention)
73webhook-url-schemePush configRejects non-HTTP webhook URL schemes (ftp://, file://)
74combined-filterListTasksCombined status + context_id filtering
75latency-metricsMetricsVerifies on_request() callback fires
76timeout-retryableRetryTimeout errors are classified as retryable
77concurrent-cancelsStress10 parallel cancel requests on same task
78stale-page-tokenPaginationGraceful handling of invalid page tokens
79agent-card-signingSigningES256 key generation, JWS sign/verify, tamper detection (signing feature)

Coverage by SDK Feature

SDK FeatureTests exercising it
AgentExecutor trait1-5, 9-14, 22, 31-36, 38, 40, 46, 49, 51-52
JSON-RPC dispatch1-2, 6-7, 9, 13, 15-16, 18-19, 23-24, 27, 29-34, 37-43, 45-46, 48-50, 53
REST dispatch3-5, 8, 10-12, 14, 17, 20-22, 25-26, 28, 32, 35-36, 44, 47
WebSocket dispatch51, 52
SSE streaming2, 4, 14, 28, 35-36, 40, 44, 45, 47
WebSocket streaming52
GetTask6, 14, 25, 35
ListTasks + pagination7, 16, 26, 37, 48, 50
CancelTask14, 20, 21
Push config CRUD8, 19, 27, 36, 43
Agent card discovery17, 18, 41, 42
ServerInterceptorAll tests (audit interceptor on every agent)
Metrics hooks29, 30, 39
return_immediately22
CancellationToken14
Error handling15, 19, 20, 21, 24, 30
Multi-agent orchestration10, 11, 12
Concurrent requests23, 31, 32, 47
SubscribeToTask resubscribe28, 45
Multiple artifacts46
File parts (binary)49
History length config50
Context ID filtering33, 48
Multi-tenancy53, 54, 55
TenantAwareInMemoryTaskStore53, 54, 55
TenantContext::scope54, 55
Batch JSON-RPC61, 62, 63, 64, 65, 66
Auth rejection (interceptor)67
GetExtendedAgentCard68
DynamicAgentCardHandler69
Agent card HTTP caching (ETag/304)70
Agent card signing (JWS/ES256)79
Backpressure / Lagged events71
Push config global limit72
Webhook URL scheme validation73
Combined status+context filter74
Metrics callbacks29, 30, 39, 75
Timeout retryability (Bug #32)76
Concurrent cancel stress77
Stale page token handling78
Agent card signing (JWS/ES256)79

Dedicated Integration Tests (Outside Agent-Team)

In addition to the 72 agent-team E2E tests (79 with optional transports and signing), the SDK includes dedicated integration test suites:

SuiteLocationTestsWhat it covers
TLS/mTLScrates/a2a-client/tests/tls_integration_tests.rs7Client cert validation, SNI hostname verification, unknown CA rejection, mutual TLS
WebSocket servercrates/a2a-server/tests/websocket_tests.rs7Send/stream, error handling, ping/pong, connection reuse, close frames
Memory & load stresscrates/a2a-server/tests/stress_tests.rs5200 concurrent requests, sustained load (500 requests/10 waves), eviction under load, multi-tenant isolation (10×50), rapid connect/disconnect

Features NOT Covered by E2E Tests

All previously uncovered features now have E2E test coverage. Agent card signing is covered by test 79 (#[cfg(feature = "signing")]).

Production Hardening

A checklist and guide for deploying a2a-rust agents in production.

Security

HTTPS

Always use HTTPS in production. a2a-rust doesn't bundle TLS — terminate TLS at your reverse proxy (nginx, Caddy, cloud load balancer) or use a TLS library:

Client ──HTTPS──→ [nginx/Caddy] ──HTTP──→ [a2a-rust agent]

CORS

Configure CORS for browser-based clients:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::CorsConfig;

// The dispatchers include CORS handling.
// Configure allowed origins for production.
}

Push Notification Security

The built-in HttpPushSender includes:

  • SSRF protection — Rejects private/loopback IPs after DNS resolution
  • Header injection prevention — Validates credentials for \r/\n characters
  • HTTPS enforcement — Optionally require HTTPS webhook URLs

Path Traversal Protection

The REST dispatcher automatically rejects:

  • .. in path segments
  • Percent-encoded %2E%2E and %2e%2e
  • Paths that escape the expected route hierarchy

Body Size Limits

LimitValueTransport
Request body4 MiBREST
Query string4 KiBREST
Event size16 MiB (configurable)SSE streaming

Reliability

Executor Timeout

Prevent hung tasks from consuming resources forever:

#![allow(unused)]
fn main() {
use std::time::Duration;

RequestHandlerBuilder::new(executor)
    .with_executor_timeout(Duration::from_secs(300))
    .build()
}

Concurrent Stream Limits

Bound memory usage from SSE connections:

#![allow(unused)]
fn main() {
RequestHandlerBuilder::new(executor)
    .with_max_concurrent_streams(1000)
    .build()
}

Task Store Limits

Prevent unbounded memory growth:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::TaskStoreConfig;

RequestHandlerBuilder::new(executor)
    .with_task_store_config(TaskStoreConfig {
        max_capacity: Some(100_000),
        task_ttl: Some(Duration::from_secs(3600)),
        eviction_interval: 64,
        max_page_size: 1000,
    })
    .build()
}

Use TaskStore::count() for monitoring capacity utilization.

Graceful Shutdown

Implement on_shutdown in your executor for cleanup:

#![allow(unused)]
fn main() {
fn on_shutdown<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
    Box::pin(async move {
        self.db_pool.close().await;
        self.cancel_token.cancel();
    })
}
}

Rate Limiting

Protect public-facing agents from abuse:

#![allow(unused)]
fn main() {
use a2a_protocol_sdk::server::{RateLimitInterceptor, RateLimitConfig};

RequestHandlerBuilder::new(executor)
    .with_interceptor(RateLimitInterceptor::new(RateLimitConfig {
        requests_per_window: 100,
        window_secs: 60,
    }))
    .build()
}

For advanced rate limiting (sliding windows, distributed counters), use a reverse proxy or implement a custom ServerInterceptor.

Client Retry & Reuse

When calling remote agents, build clients once and reuse them:

#![allow(unused)]
fn main() {
// Build once at startup
let client = ClientBuilder::new("http://agent.example.com")
    .with_retry_policy(RetryPolicy::default())
    .build()
    .unwrap();

// Reuse across all requests — client holds a connection pool
let result = client.send_message(params).await;
}

Observability

Structured Logging

Enable the tracing feature for structured logs:

[dependencies]
a2a-protocol-server = { version = "0.2", features = ["tracing"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
#![allow(unused)]
fn main() {
use tracing_subscriber::EnvFilter;

tracing_subscriber::fmt()
    .with_env_filter(
        EnvFilter::try_from_default_env()
            .unwrap_or_else(|_| EnvFilter::new("info"))
    )
    .json()  // JSON output for log aggregation
    .init();
}

Set RUST_LOG=debug for verbose output, RUST_LOG=a2a_protocol_server=debug for server-specific logs.

Health Checks

Add a health endpoint alongside your A2A dispatchers:

#![allow(unused)]
fn main() {
// Simple health check handler
async fn health_check(req: hyper::Request<impl hyper::body::Body>) -> hyper::Response<Full<Bytes>> {
    if req.uri().path() == "/health" {
        hyper::Response::new(Full::new(Bytes::from("ok")))
    } else {
        dispatcher.dispatch(req).await
    }
}
}

Performance

Connection Handling

Both dispatchers use hyper's HTTP/1.1 and HTTP/2 support via hyper_util::server::conn::auto::Builder. This automatically negotiates the best protocol version.

No Web Framework Overhead

a2a-rust works directly with hyper — no middleware framework overhead. Cross-cutting concerns (rate limiting, request logging, auth) are handled via the interceptor chain rather than a framework.

Event Queue Sizing

Tune the event queue for your workload:

#![allow(unused)]
fn main() {
// High-throughput: larger queues
.with_event_queue_capacity(256)

// Memory-constrained: smaller queues
.with_event_queue_capacity(16)
}

Deployment Checklist

  • HTTPS termination configured
  • CORS origins restricted to known clients
  • Executor timeout set
  • Max concurrent streams limited
  • Task store TTL and capacity configured
  • Rate limiting enabled for public-facing agents
  • A2A clients built once and reused (not per-request)
  • Structured logging enabled
  • Health check endpoint available
  • Push notification URLs restricted to HTTPS
  • Body/query size limits verified
  • Graceful shutdown implemented

Next Steps

GitHub Pages & CI/CD

a2a-rust uses GitHub Actions for continuous integration, crate publishing, and documentation deployment.

CI Pipeline

The CI workflow (.github/workflows/ci.yml) runs on every push and PR:

JobDescription
Formatcargo fmt --check — enforces consistent formatting
Clippycargo clippy -- -D warnings — catches common mistakes
Testcargo test --workspace — runs all tests
Denycargo deny check — audits dependencies for vulnerabilities
Doccargo doc --no-deps — verifies documentation builds
Mutantscargo mutants --workspace — zero surviving mutants required

The Mutation Testing workflow (.github/workflows/mutants.yml) runs separately:

ModeTriggerScope
Full sweepNightly (03:00 UTC) + manualAll library crates
IncrementalPull requests to mainChanged .rs files only

The full sweep produces a mutation report artifact with caught/missed/unviable counts and a mutation score. Zero missed mutants is required — any surviving mutant fails the build.

All actions are SHA-pinned for supply chain security:

- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2

Release Pipeline

The release workflow (.github/workflows/release.yml) triggers on version tags:

v0.2.0 tag → validate → ci + security → package → publish → github-release

Crates are published in dependency order:

  1. a2a-protocol-types (no internal deps)
  2. a2a-protocol-client + a2a-protocol-server (depend on types)
  3. a2a-protocol-sdk (depends on all three)

Documentation Deployment

The docs workflow builds the mdBook and deploys to GitHub Pages:

# .github/workflows/docs.yml
name: Deploy Documentation

on:
  push:
    branches: [main]
  workflow_dispatch:

permissions:
  contents: read
  pages: write
  id-token: write

concurrency:
  group: "pages"
  cancel-in-progress: false

jobs:
  build:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
      - name: Install mdBook
        run: |
          mkdir -p $HOME/.local/bin
          curl -sSL https://github.com/rust-lang/mdBook/releases/download/v0.4.40/mdbook-v0.4.40-x86_64-unknown-linux-gnu.tar.gz | tar -xz -C $HOME/.local/bin
          echo "$HOME/.local/bin" >> $GITHUB_PATH
      - name: Build book
        run: mdbook build book
      - uses: actions/configure-pages@v5
      - uses: actions/upload-pages-artifact@v3
        with:
          path: book/book

  deploy:
    needs: build
    runs-on: ubuntu-latest
    environment:
      name: github-pages
      url: ${{ steps.deployment.outputs.page_url }}
    steps:
      - id: deployment
        uses: actions/deploy-pages@v4

Setting Up GitHub Pages

  1. Go to Settings → Pages in your GitHub repo
  2. Set Source to "GitHub Actions"
  3. Push to main to trigger the first deployment
  4. Your docs will be live at https://tomtom215.github.io/a2a-rust/

Building Locally

# Install mdBook
cargo install mdbook

# Build
mdbook build book

# Serve with hot reload
mdbook serve book --open

Cargo Documentation

Rust API docs are generated separately:

# Build API docs for all crates
cargo doc --workspace --no-deps --open

Consider deploying these alongside the book, or linking to docs.rs once published.

Next Steps

Pitfalls & Lessons Learned

A catalog of non-obvious problems encountered during development. Each entry documents a real issue and its solution.

Serde Pitfalls

Untagged enums hide inner errors

#[serde(untagged)] on SendMessageResponse and StreamResponse swallows the real deserialization error and replaces it with a generic "data did not match any variant" message.

Workaround: When debugging, temporarily switch to an externally tagged enum to see the real error. In production, log the raw JSON before attempting deserialization.

#[serde(default)] vs Option<T>

Using #[serde(default)] on a Vec<T> field means the field is present but empty when omitted from JSON. Using Option<Vec<T>> means it is absent (None).

The A2A spec distinguishes between "omitted" and "empty array" for fields like history and artifacts, so Option<Vec<T>> is the correct choice.

#![allow(unused)]
fn main() {
// Wrong: field is always present (empty vec if omitted)
#[serde(default)]
pub history: Vec<Message>,

// Correct: field is absent when not provided
#[serde(skip_serializing_if = "Option::is_none")]
pub history: Option<Vec<Message>>,
}

#[non_exhaustive] on enums breaks downstream matches

Adding #[non_exhaustive] to TaskState, ErrorCode, etc. forces downstream crates to include a wildcard arm. This is intentional for forward compatibility:

#![allow(unused)]
fn main() {
match task.status.state {
    TaskState::Completed => { /* ... */ }
    TaskState::Failed => { /* ... */ }
    _ => { /* Handle future states */ }
}
}

Hyper 1.x Pitfalls

Body is not Clone

Hyper 1.x Incoming body is consumed on read. You cannot read the body twice. Buffer it first:

#![allow(unused)]
fn main() {
use http_body_util::BodyExt;

let bytes = req.into_body().collect().await?.to_bytes();
// Now work from `bytes` (can be cloned, read multiple times)
}

Response builder panics on invalid header values

hyper::Response::builder().header(k, v) silently stores the error and panics when .body() is called.

Solution: Always use unwrap_or_else with a fallback response, or validate header values with .parse::<HeaderValue>() first. a2a-rust uses unwrap_or_else(|_| fallback_error_response()) in production paths.

size_hint() upper bound may be None

hyper::body::Body::size_hint().upper() returns None when Content-Length is absent. Always check for None before comparing against the body size limit:

#![allow(unused)]
fn main() {
if let Some(upper) = body.size_hint().upper() {
    if upper > MAX_BODY_SIZE {
        return Err(/* payload too large */);
    }
}
}

SSE Streaming Pitfalls

SSE parser must handle partial lines

SSE events may arrive split across TCP frames. The parser must buffer partial lines and only process complete \n-terminated lines. The EventBuffer in a2a-protocol-client handles this correctly, but naive lines() iterators will break on partial frames.

Memory limit on buffered SSE data

A malicious server can send an infinite SSE event (no \n\n terminator). The client parser enforces a 16 MiB cap on buffered event data to prevent OOM.

Push Notification Pitfalls

SSRF via webhook URLs

Push notification url fields can point to internal services (e.g., http://169.254.169.254/ — the cloud metadata endpoint).

Solution: The HttpPushSender resolves the URL and rejects private/loopback IP addresses after DNS resolution, not on the URL string alone. This prevents DNS rebinding attacks.

Header injection via credentials

Push notification credentials can contain newlines that inject additional HTTP headers.

Solution: The push sender validates that credential values contain no \r or \n characters before using them in HTTP headers.

Async / Tokio Pitfalls

Object-safe async traits need Pin<Box<dyn Future>>

Rust does not yet support async fn in traits that are used as dyn Trait. The TaskStore, AgentExecutor, and PushSender traits use explicit Pin<Box<dyn Future<Output = ...> + Send + 'a>> return types:

#![allow(unused)]
fn main() {
// The pattern for all object-safe async trait methods
fn my_method<'a>(
    &'a self,
    args: &'a Args,
) -> Pin<Box<dyn Future<Output = Result<T>> + Send + 'a>> {
    Box::pin(async move {
        // async code here
        Ok(result)
    })
}
}

Cancellation token cleanup

CancellationTokens that are never cancelled accumulate in the token map. The handler cleans up already-cancelled tokens before inserting new ones, and enforces a hard cap of 10,000 tokens.

Amortized eviction vs test expectations

Running O(n) eviction on every save() call is expensive. The task store amortizes eviction to every 64 writes. Tests that depend on eviction must call run_eviction() explicitly or account for the amortization interval.

std::sync::RwLock poisoning — fail-fast vs silent ignore

When a thread panics while holding a std::sync::RwLock, the lock becomes "poisoned." There are two strategies:

  1. Silent ignore (lock.read().ok()?) — returns None/no-op on poisoned locks. This hides bugs.
  2. Fail-fast (lock.read().expect("poisoned")) — panics immediately, surfacing the problem.

a2a-rust uses fail-fast. If you see a "lock poisoned" panic, the root cause is a prior panic in another thread. Fix that panic first.

Rate limiter atomics under read lock — CAS loop required

When multiple threads share a rate limiter bucket under a read lock, non-atomic multi-step operations (load window → check → store count) create TOCTOU races. Two threads can both see an old window and both reset the counter.

Solution: Use compare_exchange (CAS) to atomically swap the window number. Only one thread wins the CAS; others loop and retry with the updated state.

Transport Pitfalls

Query string parameters must be URL-encoded

When building REST transport query strings from JSON values, parameter values must be percent-encoded per RFC 3986. A value like status=active&role=admin without encoding becomes three separate query parameters instead of one.

Solution: The build_query_string() function in the REST transport encodes all non-unreserved characters (A-Z a-z 0-9 - . _ ~).

WebSocket stream termination must use structured parsing

Detecting stream completion by checking text.contains("stream_complete") is fragile — it false-positives on any payload text containing that substring, and misses terminal status updates that don't contain that exact string.

Solution: Deserialize the JSON-RPC frame and check the result object for terminal task states (completed, failed, canceled, rejected) or the stream_complete sentinel.

Pre-bind listeners for all server types when building agent cards

The gRPC dispatcher had the same placeholder-URL bug as the HTTP dispatchers: the agent card was built before the server bound to a port, so the card contained "http://placeholder". This applied to any transport that binds its own port.

Solution: Pre-bind a TcpListener, extract the address, build the handler with the correct URL, then pass the listener via serve_with_listener() (gRPC) or the accept loop (HTTP).

Background tasks cannot propagate errors — log them

In a tokio::spawn-ed task, errors have no caller to return to. Using let _ = store.save(...).await silently drops failures, causing task state to diverge between the event queue and the persistent store.

Solution: Use if let Err(e) = store.save(...).await { trace_error!(...) } so failures are observable via logs and metrics.

Push Config Store Pitfalls

Per-resource limits are not enough — add global limits

A per-task push config limit (e.g., 100 configs per task) does not prevent an attacker from creating millions of tasks with 100 configs each. Always pair per-resource limits with a global cap (e.g., max_total_configs = 100_000). The global check must run before the per-resource check in the write path.

Webhook URL scheme validation

Checking for private IPs and hostnames in webhook URLs is insufficient. URLs like ftp://evil.com/hook or file:///etc/passwd bypass IP-based SSRF checks entirely. Always validate that the URL scheme is http or https before performing any further validation.

Performance Pitfalls

Vec<u8> vs Bytes in retry loops

Cloning a Vec<u8> inside a retry loop allocates a full heap copy each time. Use bytes::Bytes (reference-counted) so that .clone() is just an atomic reference count increment. This matters for push notification delivery where large payloads may be retried 3+ times.

Graceful Shutdown Pitfalls

Bound executor cleanup with a timeout

executor.on_shutdown().await can hang indefinitely if the executor's cleanup routine blocks. Always wrap with tokio::time::timeout() — the default is 10 seconds. Users can override via shutdown_with_timeout(duration).

gRPC Pitfalls

Map all tonic status codes, not just the common ones

Only mapping 4 gRPC status codes to A2A error codes loses semantic information for Unauthenticated, PermissionDenied, ResourceExhausted, etc. Map all relevant codes explicitly so clients can distinguish between error categories.

Feature-gated code paths need their own dogfood pass

The gRPC path behind #[cfg(feature = "grpc")] had the exact same placeholder URL bug that was already fixed for HTTP (Bug #12 → Bug #18). Feature-gated code is easy to miss during reviews. Always re-verify known bug patterns across all feature gates.

Cancel / State Machine Pitfalls

Check terminal state before emitting cancel

If an executor completes between the handler's cancel check and the cancel signal arrival, unconditionally emitting TaskState::Canceled causes an invalid Completed → Canceled state transition. Guard cancel emission with cancellation_token.is_cancelled() or a terminal-state check.

Re-check cancellation between multi-phase work

If an executor performs multiple phases (e.g., two artifact emissions), check cancellation between phases. Otherwise, cancellation between phases is delayed until the next natural check point.

Server Accept Loop Pitfalls

break vs continue on transient accept errors

Using Err(_) => break in a TCP accept loop kills the entire server on a single transient error (e.g., EMFILE when the file descriptor limit is reached). Use Err(_) => continue to skip the bad connection and keep serving. Log the error for observability but do not let it take down the server.

Workspace / Cargo Pitfalls

cargo-fuzz needs its own workspace

The fuzz/ directory contains its own Cargo.toml with [workspace] to prevent cargo-fuzz from conflicting with the main workspace. The fuzz crate references a2a-protocol-types via a relative path dependency.

Feature unification across workspace

Enabling a feature in one crate (e.g., signing in a2a-protocol-types) enables it for all crates in the workspace during cargo test --workspace. Use --no-default-features or per-crate test commands when testing feature gates.

Testing Pitfalls

HashMap iteration order is non-deterministic

The InMemoryTaskStore uses a HashMap internally. list() must sort results by task ID before applying pagination; otherwise, page boundaries shift between runs and tests become flaky.

Percent-encoded path traversal

Checking path.contains("..") is insufficient. Attackers can use %2E%2E (or mixed-case %2e%2E) to bypass the check.

Solution: The REST dispatcher percent-decodes the path before checking for .. sequences.

Next Steps

Architecture Decision Records

Key design decisions for a2a-rust, documented as ADRs. Each record captures the context, decision, and rationale.

ADR 0001: Workspace Crate Structure

Status: Accepted

Context: The A2A protocol has three distinct concerns with different dependency trees: types (pure data, no I/O), client (HTTP sending), and server (HTTP receiving). A single-crate approach forces every user to compile the full dependency tree regardless of need.

Decision: Four crates in a Cargo workspace:

CratePurposeKey Dependencies
a2a-protocol-typesWire types onlyserde, serde_json
a2a-protocol-clientHTTP clienthyper, tokio
a2a-protocol-serverServer frameworkhyper, tokio
a2a-protocol-sdkUmbrella re-exportsAll above

Rationale: An agent server implementor doesn't pay for the client dependency tree and vice versa. The types crate is usable without any async runtime — useful for codegen, validation, or non-HTTP transports.

ADR 0002: Dependency Philosophy

Status: Accepted

Context: Rust SDK quality is inversely correlated with transitive dependency count. Each dependency adds compile time, supply chain attack surface, version conflicts, and potential license issues.

Decision: Minimal dependency footprint:

  • No web frameworks (axum, actix, warp)
  • No TLS bundled (bring your own or use a proxy)
  • No logging framework forced (optional tracing feature)
  • In-tree SSE parser instead of third-party crate
  • serde + hyper as the only heavyweight deps

Rationale: A production-grade SDK must work in corporate environments with strict dependency audits, embedded/constrained environments, and without forcing users into specific TLS or logging frameworks.

ADR 0003: Async Runtime Strategy

Status: Accepted

Context: Rust async code is runtime-agnostic at the language level, but hyper 1.x uses tokio internally. Making the SDK runtime-agnostic would require wrapping every I/O call behind an abstraction layer.

Decision: Tokio is the mandatory async runtime. It is a required dependency (not optional, not feature-gated) for the I/O crates. a2a-protocol-types has no async runtime dependency.

Rationale: >95% of Rust async production code runs on tokio. The complexity cost of runtime abstraction is not justified by the ~5% of users on other runtimes.

ADR 0004: Transport Abstraction Design

Status: Accepted

Context: A2A defines three transport bindings (JSON-RPC, REST, gRPC). Both client and server share protocol logic that must not be duplicated across transport implementations.

Decision: Three-layer architecture:

Dispatcher (transport) → RequestHandler (protocol) → AgentExecutor (user logic)
  • Dispatchers handle HTTP-level concerns (routing, content types, CORS)
  • RequestHandler contains all protocol logic (task lifecycle, stores, streaming)
  • AgentExecutor is the user's entry point

The Transport trait (client) and Dispatcher trait (server) make transports pluggable. Both share the same handler/client core.

Rationale: Adding gRPC support later requires only a new dispatcher/transport — zero changes to protocol logic or user code.

ADR 0005: SSE Streaming Design

Status: Accepted

Context: A2A streaming uses Server-Sent Events (SSE). Rust lacks a battle-tested, zero-dep SSE library that is hyper 1.x native.

Decision: In-tree SSE implementation for both client parsing and server emission:

  • Client parser: ~220 lines, handles partial TCP frames, enforces 16 MiB buffer cap
  • Server emitter: ~200 lines, formats SSE data: lines with proper \n\n terminators
  • Zero additional dependencies beyond hyper

Rationale: Third-party SSE crates either use older hyper versions, carry unnecessary dependencies, or lack proper backpressure. The in-tree implementation is small enough to audit, test, and maintain.

ADR 0006: Mutation Testing as a Required Quality Gate

Status: Accepted

Context: The test suite includes unit, integration, property, fuzz, and E2E dogfood tests — but none of these measure whether the tests actually detect real bugs. A test suite can achieve 100% line coverage with trivial assertions. At multi-data-center deployment scales, the bugs that escape traditional testing have the highest blast radius.

Decision: Adopt cargo-mutants as a mandatory quality gate with zero surviving mutants required across all library crates. CI runs a full nightly sweep and incremental PR checks on changed files. Configuration is centralized in mutants.toml.

Rationale: Mutation testing is the only technique that directly measures fault detection capability. It provides an objective, automated answer to "would this test suite catch a real bug at this location?" The compute cost is managed through incremental mode, nightly scheduling, and exclusion of unproductive targets.

Summary

ADRKey Decision
0001Four-crate workspace (types, client, server, sdk)
0002Minimal dependencies, no bundled framework
0003Tokio as mandatory runtime
0004Three-layer architecture (dispatcher → handler → executor)
0005In-tree SSE parser/emitter, zero additional deps
0006cargo-mutants as mandatory quality gate, zero surviving mutants

The full ADR documents are in the docs/adr/ directory.

Next Steps

Configuration Reference

Complete reference of all configuration options across a2a-rust crates.

Server Configuration

RequestHandlerBuilder

OptionTypeDefaultDescription
with_agent_cardAgentCardNoneDiscovery card for /.well-known/agent.json
with_task_storeimpl TaskStoreInMemoryTaskStoreCustom task storage backend
with_task_store_configTaskStoreConfigNo limitsTTL and capacity for default store
with_push_config_storeimpl PushConfigStoreInMemoryPushConfigStoreCustom push config storage
with_push_senderimpl PushSenderNoneWebhook delivery implementation
with_interceptorimpl ServerInterceptorEmpty chainServer middleware
with_executor_timeoutDurationNoneMax time for executor completion
with_event_queue_capacityusize64Bounded channel size per stream
with_max_event_sizeusize16 MiBMax serialized SSE event size
with_max_concurrent_streamsusizeUnboundedLimit concurrent SSE connections
with_event_queue_write_timeoutDuration5sWrite timeout for event queue sends
with_metricsimpl MetricsNoopMetricsMetrics observer for handler activity
with_handler_limitsHandlerLimitsSee belowConfigurable validation limits

HandlerLimits

FieldTypeDefaultDescription
max_id_lengthusize1,024Maximum task/context ID length
max_metadata_sizeusize1 MiBMaximum serialized metadata size
max_cancellation_tokensusize10,000Cleanup sweep threshold
max_token_ageDuration1 hourStale token eviction age
push_delivery_timeoutDuration5sPer-webhook delivery timeout

TaskStoreConfig

FieldTypeDefaultDescription
max_capacityOption<usize>10,000Maximum stored tasks; oldest terminal tasks evicted on overflow
task_ttlOption<Duration>1 hourTTL for completed/failed tasks
eviction_intervalu6464Writes between automatic eviction sweeps
max_page_sizeu321,000Maximum tasks per page in list queries

InMemoryPushConfigStore

ConstructorDefaultDescription
::new()100Default max push configs per task
::with_max_configs_per_task(N)Custom per-task push config limit

DispatchConfig

Shared configuration for both JSON-RPC and REST dispatchers. Pass to JsonRpcDispatcher::with_config() or RestDispatcher::with_config().

FieldTypeDefaultDescription
max_request_body_sizeusize4 MiBLarger bodies return 413
body_read_timeoutDuration30sSlow loris protection
max_query_string_lengthusize4,096REST only; longer queries return 414
sse_keep_alive_intervalDuration30sPeriodic keep-alive comment interval for SSE streams
sse_channel_capacityusize64SSE response body channel buffer size

GrpcConfig

Configuration for the gRPC dispatcher (requires grpc feature).

FieldTypeDefaultDescription
max_message_sizeusize4 MiBMaximum inbound/outbound message size
concurrency_limitusize256Max concurrent gRPC requests per connection
stream_channel_capacityusize64Bounded channel for streaming responses

PushRetryPolicy

Configurable retry policy for HttpPushSender. Pass via HttpPushSender::with_retry_policy().

FieldTypeDefaultDescription
max_attemptsusize3Maximum delivery attempts
backoffVec<Duration>[1s, 2s]Backoff durations between retries

RateLimitConfig

FieldTypeDefaultDescription
requests_per_windowu64100Max requests per caller per window
window_secsu6460Window duration in seconds

Internal Limits

LimitValueDescription
Event queue typebroadcastFan-out to multiple subscribers; slow readers skip missed events
Rate limiter cleanup interval256 checksStale buckets (from departed callers) evicted every 256 check() calls
Rate limiter window CASLock-freeWindow transitions use compare_exchange to avoid TOCTOU races
Credential store poisoningFail-fastInMemoryCredentialsStore panics on poisoned locks rather than returning None

Client Configuration

ClientBuilder

OptionTypeDefaultDescription
with_protocol_binding&strAuto-detectTransport: "JSONRPC", "REST", or "GRPC"
with_timeoutDuration30sPer-request timeout
with_connection_timeoutDuration10sTCP connection timeout
with_stream_connect_timeoutDuration30sSSE connect timeout
with_retry_policyRetryPolicyNoneRetry on transient errors
with_accepted_output_modesVec<String>["text/plain", "application/json"]MIME types accepted
with_history_lengthu32NoneMessages in responses
with_return_immediatelyboolfalseDon't wait for completion
with_interceptorimpl CallInterceptorEmpty chainClient middleware

GrpcTransportConfig

Configuration for the gRPC client transport (requires grpc feature).

FieldTypeDefaultDescription
timeoutDuration30sPer-request timeout
connect_timeoutDuration10sConnection timeout
max_message_sizeusize4 MiBMaximum message size
stream_channel_capacityusize64Streaming response buffer

RetryPolicy

FieldTypeDefaultDescription
max_retriesu323Maximum retry attempts
initial_backoffDuration500msBackoff before first retry
max_backoffDuration30sCaps exponential growth
backoff_multiplierf642.0Multiplier per retry

SSE Parser Limits

LimitValueDescription
Buffer cap16 MiBMax buffered SSE data (aligned with server)
Connect timeout30s (default)Initial connection timeout

HTTP Caching (Agent Card)

HeaderDefaultDescription
Cache-Controlpublic, max-age=60Configurable max-age
ETagAuto-computedContent hash
Last-ModifiedAuto-setTimestamp of last change

Feature Flags

a2a-protocol-server

FeatureDefaultDescription
signingOffAgent card signing
tracingOffStructured logging via tracing crate
sqliteOffSQLite-backed task and push config stores via sqlx
websocketOffWebSocket transport via tokio-tungstenite
grpcOffgRPC transport via tonic
otelOffOpenTelemetry metrics via opentelemetry-otlp

a2a-protocol-client

FeatureDefaultDescription
signingOffAgent card signing verification
tracingOffStructured logging via tracing crate
tls-rustlsOffHTTPS via rustls (no OpenSSL dependency)
websocketOffWebSocket transport via tokio-tungstenite
grpcOffgRPC transport via tonic

a2a-protocol-types

FeatureDefaultDescription
signingOffJWS/ES256 agent card signing (RFC 8785 canonicalization)

a2a-protocol-sdk (umbrella)

FeatureDefaultDescription
signingOffEnables signing in all sub-crates
tracingOffEnables tracing in client and server
tls-rustlsOffEnables tls-rustls in client
grpcOffEnables grpc in client and server
otelOffEnables otel in the server

Environment Variables

VariableDescription
RUST_LOGLog level filter (when tracing feature is enabled)

Examples:

RUST_LOG=info              # Info and above
RUST_LOG=debug             # Debug and above
RUST_LOG=a2a_protocol_server=debug  # Debug for server crate only
RUST_LOG=a2a_protocol_server=trace,a2a_protocol_client=debug  # Per-crate levels

Next Steps

API Quick Reference

A condensed overview of all public types, traits, and functions across the a2a-rust crates.

Wire Types (a2a-protocol-types)

Core Types

TypeDescription
TaskUnit of work with ID, status, history, artifacts
TaskIdNewtype wrapper for task identifiers
TaskStateEnum: Submitted, Working, InputRequired, AuthRequired, Completed, Failed, Canceled, Rejected
TaskStatusState + optional message + timestamp
TaskVersionMonotonically increasing version number
ContextIdConversation context identifier

Messages

TypeDescription
MessageStructured payload with ID, role, parts
MessageIdNewtype wrapper for message identifiers
MessageRoleEnum: User, Agent
PartContent unit: text, file, or data
PartContentEnum: Text, File, Data

Artifacts

TypeDescription
ArtifactResult produced by an agent
ArtifactIdNewtype wrapper for artifact identifiers

Events

TypeDescription
StreamResponseEnum: Task, Message, StatusUpdate, ArtifactUpdate
TaskStatusUpdateEventStatus change notification
TaskArtifactUpdateEventArtifact delivery notification

Agent Card

TypeDescription
AgentCardRoot discovery document
AgentInterfaceTransport endpoint descriptor
AgentCapabilitiesCapability flags (streaming, push, extended card)
AgentSkillDiscrete agent capability
AgentProviderOrganization info

Parameters

TypeDescription
MessageSendParamsSendMessage / SendStreamingMessage input
SendMessageConfigurationOutput modes, history, push config
TaskQueryParamsGetTask input
ListTasksParamsListTasks input with filters and pagination
CancelTaskParamsCancelTask input
TaskIdParamsSubscribeToTask input

Push Notifications

TypeDescription
TaskPushNotificationConfigWebhook registration
AuthenticationInfoWebhook auth credentials

Responses

TypeDescription
SendMessageResponseEnum: Task or Message
TaskListResponsePaginated task list

Errors

TypeDescription
A2aErrorProtocol-level error
ErrorCodeStandard error codes
A2aResult<T>Alias for Result<T, A2aError>

JSON-RPC

TypeDescription
JsonRpcRequestJSON-RPC 2.0 request envelope
JsonRpcErrorJSON-RPC error object
JsonRpcVersionVersion marker ("2.0")

Client (a2a-protocol-client)

Core Types

TypeDescription
A2aClientMain client for calling remote agents
ClientBuilderFluent builder for client configuration
EventStreamAsync SSE event stream
RetryPolicyConfigurable retry with exponential backoff

Client Methods

MethodReturnsDescription
send_message(params)SendMessageResponseSynchronous send
stream_message(params)EventStreamStreaming send
get_task(params)TaskRetrieve task by ID
list_tasks(params)TaskListResponseQuery tasks
cancel_task(id)TaskCancel a running task
subscribe_to_task(id)EventStreamRe-subscribe to task events
set_push_config(config)TaskPushNotificationConfigCreate push config
get_push_config(task_id, id)TaskPushNotificationConfigGet push config
list_push_configs(params)ListPushConfigsResponseList push configs
delete_push_config(task_id, id)()Delete push config
get_authenticated_extended_card(params)AgentCardGet extended card

Interceptors

TypeDescription
CallInterceptorRequest/response hook trait
InterceptorChainOrdered interceptor sequence

Transport

TypeDescription
TransportPluggable transport trait
JsonRpcTransportJSON-RPC 2.0 transport
RestTransportREST/HTTP transport

Server (a2a-protocol-server)

Core Types

TypeDescription
RequestHandlerCentral protocol orchestrator
RequestHandlerBuilderFluent builder for handler configuration
RequestContextInformation about the incoming request

Traits

TraitDescription
AgentExecutorAgent logic entry point
TaskStoreTask persistence backend
PushConfigStorePush config persistence
PushSenderWebhook delivery
ServerInterceptorServer-side middleware
AgentCardProducerDynamic agent card generation
DispatcherHTTP dispatch trait (for serve())

Dispatchers

TypeDescription
JsonRpcDispatcherJSON-RPC 2.0 HTTP dispatcher (implements Dispatcher)
RestDispatcherRESTful HTTP dispatcher (implements Dispatcher)

Server Startup

FunctionDescription
serve(addr, dispatcher)Bind + accept loop (blocking)
serve_with_addr(addr, dispatcher)Bind + spawn, returns SocketAddr

Built-in Implementations

TypeDescription
InMemoryTaskStoreIn-memory task store with TTL
InMemoryPushConfigStoreIn-memory push config store
HttpPushSenderHTTP webhook delivery with SSRF protection
StaticAgentCardHandlerStatic agent card with HTTP caching
DynamicAgentCardHandlerDynamic agent card with producer

Streaming

TypeDescription
EventEmitterErgonomic event emission helper
EventQueueWriterWrite events to a stream
EventQueueReaderRead events from a stream
EventQueueManagerManages stream lifecycle
InMemoryQueueWriterBounded channel writer
InMemoryQueueReaderBounded channel reader

Configuration

TypeDescription
CorsConfigCross-origin policy
TaskStoreConfigTTL and capacity for in-memory store
ServerErrorServer-level error type
ServerResult<T>Alias for Result<T, ServerError>

SDK (a2a-protocol-sdk)

Modules

ModuleRe-exports
a2a_protocol_sdk::typesAll a2a-protocol-types exports
a2a_protocol_sdk::clientAll a2a-protocol-client exports
a2a_protocol_sdk::serverAll a2a-protocol-server exports
a2a_protocol_sdk::preludeMost commonly used types

Prelude Contents

The prelude includes the most commonly used types from all three crates — see Project Structure for the full list.

Constructors Cheatsheet

#![allow(unused)]
fn main() {
// Task status
TaskStatus::new(TaskState::Working)
TaskStatus::with_timestamp(TaskState::Completed)

// Messages and parts
Part::text("hello")
Part::file_bytes(base64_string)
Part::file_uri("https://...")
Part::data(json_value)

// Artifacts
Artifact::new("artifact-id", vec![Part::text("content")])

// IDs
TaskId::new("task-123")
ContextId::new("ctx-456")
MessageId::new("msg-789")

// Capabilities (non_exhaustive — use builder)
AgentCapabilities::none()
    .with_streaming(true)
    .with_push_notifications(false)

// Push configs
TaskPushNotificationConfig::new("task-id", "https://webhook.url")
}

Next Steps

Changelog

All notable changes to a2a-rust are documented in the project's CHANGELOG.md.

Versioning

a2a-rust follows Semantic Versioning:

  • Major (1.0.0) — Breaking API changes
  • Minor (0.2.0) — New features, backward compatible
  • Patch (0.2.1) — Bug fixes, backward compatible

All four workspace crates share the same version number and are released together.

Release Process

Releases are triggered by pushing a version tag:

git tag v0.2.0
git push origin v0.2.0

The release workflow automatically:

  1. Validates all crate versions match the tag
  2. Runs the full CI suite
  3. Publishes crates to crates.io in dependency order
  4. Creates a GitHub release with notes

Publish Order

a2a-protocol-types → a2a-protocol-client + a2a-protocol-server → a2a-protocol-sdk

This ensures each crate's dependencies are available before it publishes.

Latest (Unreleased)

  • Wave 2 inline unit tests — 110 new #[cfg(test)] tests added directly to 9 critical a2a-protocol-server source files (messaging, event_processing, push_config, lifecycle, handler/mod, REST/JSON-RPC/gRPC/WebSocket dispatchers). Total workspace test count: 1,255 passing tests.

Beyond-Spec Enhancements

  • OpenTelemetry metrics (otel feature) — OtelMetrics with native OTLP export
  • Connection pool metricsConnectionPoolStats and on_connection_pool_stats callback
  • Hot-reload agent cardsHotReloadAgentCardHandler with file polling and SIGHUP
  • Store migration tooling (sqlite feature) — MigrationRunner with V1–V3 built-in migrations
  • Per-tenant configurationPerTenantConfig and TenantLimits for differentiated service levels
  • TenantResolver traitHeaderTenantResolver, BearerTokenTenantResolver, PathSegmentTenantResolver
  • Agent card signing E2E — test 79 in agent-team suite (signing feature)

Bug Fixes (Passes 7–8)

  • Timeout errors now correctly classified as retryable (ClientError::Timeout)
  • SSE parser O(n) dequeue replaced with VecDeque for O(1) pop_front
  • Double-encoded path traversal bypass fixed with two-pass percent-decoding
  • gRPC stream errors now preserve protocol error codes
  • Rate limiter TOCTOU race fixed with CAS loop
  • Push config store now enforces global limits (DoS prevention)

v0.2.0 (2026-03-15)

Initial implementation of A2A v1.0.0 with all 11 protocol methods, dual transport (JSON-RPC + REST), SSE streaming, push notifications, agent card discovery, HTTP caching, enterprise hardening, and 600+ tests (workspace total now 1,255 after subsequent waves).

For the complete version history, see CHANGELOG.md.