Introduction
a2a-rust is a pure Rust implementation of the A2A (Agent-to-Agent) protocol v1.0.0 — an open standard for connecting AI agents over the network.
If you're building AI agents that need to talk to each other, discover capabilities, delegate tasks, and stream results — this library gives you the full protocol stack with zero unsafe code, compile-time type safety, and production-grade hardening.
What is the A2A Protocol?
The Agent-to-Agent protocol defines how AI agents discover, communicate with, and delegate work to each other. Think of it as HTTP for AI agents: a shared language that lets any agent talk to any other, regardless of implementation.
The protocol defines:
- Agent Cards — Discovery documents describing what an agent can do
- Tasks — Units of work with well-defined lifecycle states
- Messages — Structured payloads carrying text, data, or binary content
- Streaming — Real-time progress via Server-Sent Events (SSE)
- Push Notifications — Webhook delivery for async results
Why Rust?
Rust gives you performance, safety, and correctness without compromise:
- Zero-cost abstractions — Trait objects, generics, and async/await with no runtime overhead
- No
unsafe— The entire codebase is free of unsafe code - Thread safety at compile time — All public types implement
Send + Sync - Exhaustive pattern matching — The compiler catches missing protocol states
- Production-ready — Battle-tested HTTP via hyper, robust error handling, no panics in library code
Architecture at a Glance
a2a-rust is organized as a Cargo workspace with four crates:
┌─────────────────────────────────────────────┐
│ a2a-protocol-sdk │
│ umbrella re-exports + prelude │
├──────────────────────┬──────────────────────┤
│ a2a-protocol-client │ a2a-protocol-server │
│ HTTP client │ agent framework │
├──────────────────────┴──────────────────────┤
│ a2a-protocol-types │
│ wire types, serde, no I/O │
└─────────────────────────────────────────────┘
| Crate | Purpose |
|---|---|
a2a-protocol-types | All A2A wire types with serde serialization. Pure data — no I/O, no async. |
a2a-protocol-client | HTTP client for calling remote A2A agents. Supports JSON-RPC and REST transports. |
a2a-protocol-server | Server framework for building A2A agents. Pluggable stores, interceptors, and dispatchers. |
a2a-protocol-sdk | Umbrella crate that re-exports everything with a convenient prelude module. |
Key Features
- Full v1.0 wire types — Every A2A type with correct JSON serialization
- Quad transport — JSON-RPC 2.0, REST, WebSocket (
websocketfeature flag), and gRPC (grpcfeature flag), both client and server - SSE streaming — Real-time
SendStreamingMessageandSubscribeToTask - Push notifications — Pluggable
PushSenderwith SSRF protection - Agent card discovery — Static and dynamic card handlers with HTTP caching (ETag, Last-Modified, 304)
- Pluggable stores —
TaskStoreandPushConfigStoretraits with in-memory, SQLite, and tenant-aware backends - Multi-tenancy —
TenantAwareInMemoryTaskStoreandTenantAwareSqliteTaskStorewith full tenant isolation - Interceptor chains — Client and server middleware for auth, logging, metrics, rate limiting
- Rate limiting — Built-in
RateLimitInterceptorwith per-caller fixed-window limiting - Client retry — Configurable
RetryPolicywith exponential backoff for transient failures - Server startup helper —
serve()reduces ~25 lines of hyper boilerplate to one call - Request ID propagation —
CallContext::request_idauto-extracted fromX-Request-IDheader - Task store metrics —
TaskStore::count()for monitoring and capacity management - Task state machine — Validated transitions per the A2A specification
- Executor ergonomics —
boxed_future,agent_executor!macro,EventEmitterreduce boilerplate - Executor timeout — Kills hung agent tasks automatically
- CORS support — Configurable cross-origin policies
- Fully configurable — All defaults (timeouts, limits, intervals) are overridable via builders
- Mutation-tested — Zero surviving mutants enforced via
cargo-mutantsCI gate - No panics in library code — All fallible operations return
Result
All 11 Protocol Methods
| Method | Description |
|---|---|
SendMessage | Synchronous message send; returns completed task |
SendStreamingMessage | Streaming send with real-time SSE events |
GetTask | Retrieve a task by ID |
ListTasks | Query tasks with filtering and pagination |
CancelTask | Request cancellation of a running task |
SubscribeToTask | Re-subscribe to an existing task's event stream |
CreateTaskPushNotificationConfig | Register a webhook for push delivery |
GetTaskPushNotificationConfig | Retrieve a push config by ID |
ListTaskPushNotificationConfigs | List all push configs for a task |
DeleteTaskPushNotificationConfig | Remove a push config |
GetExtendedAgentCard | Fetch authenticated agent card |
What's Next?
- Installation — Add a2a-rust to your project
- Quick Start — See the protocol in action in 5 minutes
- Your First Agent — Build an echo agent from scratch
- Protocol Overview — Understand the A2A protocol model
Installation
Requirements
- Rust 1.93+ (stable)
- A working internet connection for downloading crates
Adding to Your Project
The easiest way to use a2a-rust is through the umbrella SDK crate, which re-exports everything you need:
[dependencies]
a2a-protocol-sdk = "0.2"
tokio = { version = "1", features = ["full"] }
The SDK crate re-exports a2a-protocol-types, a2a-protocol-client, and a2a-protocol-server so you only need one dependency.
Individual Crates
If you prefer fine-grained control, depend on individual crates:
# Types only (no I/O, no async runtime)
a2a-protocol-types = "0.2"
# Client only
a2a-protocol-client = "0.2"
# Server only
a2a-protocol-server = "0.2"
This is useful when:
- You're building a client only and don't need server types
- You're building a server only and don't need client types
- You want to minimize compile times and dependency trees
Feature Flags
All features are off by default to minimize compile times and dependency trees.
a2a-protocol-types
| Feature | Description |
|---|---|
signing | JWS/ES256 agent card signing (RFC 8785 canonicalization) |
a2a-protocol-client
| Feature | Description |
|---|---|
tls-rustls | HTTPS via rustls (no OpenSSL required) |
signing | Agent card signing verification |
tracing | Structured logging via the tracing crate |
websocket | WebSocket transport via tokio-tungstenite |
grpc | gRPC transport via tonic |
a2a-protocol-server
| Feature | Description |
|---|---|
signing | Agent card signing |
tracing | Structured logging via the tracing crate |
sqlite | SQLite-backed task and push config stores via sqlx |
websocket | WebSocket transport via tokio-tungstenite |
grpc | gRPC transport via tonic |
otel | OpenTelemetry metrics via opentelemetry-otlp |
a2a-protocol-sdk (umbrella)
| Feature | Description |
|---|---|
signing | Enables signing across types, client, and server |
tracing | Enables tracing across client and server |
tls-rustls | Enables HTTPS in the client |
grpc | Enables gRPC across client and server |
otel | Enables OpenTelemetry metrics in the server |
Enable features in your Cargo.toml:
[dependencies]
a2a-protocol-sdk = { version = "0.2", features = ["tracing", "signing"] }
# Or with individual crates:
a2a-protocol-server = { version = "0.2", features = ["tracing", "sqlite"] }
a2a-protocol-client = { version = "0.2", features = ["tls-rustls"] }
Verifying the Installation
Create a simple main.rs to verify everything compiles:
use a2a_protocol_sdk::prelude::*; fn main() { // Create a task status let status = TaskStatus::new(TaskState::Submitted); println!("Task state: {:?}", status.state); // Create a message with a text part let part = Part::text("Hello, A2A!"); println!("Part: {:?}", part); // Verify agent capabilities builder let caps = AgentCapabilities::none() .with_streaming(true) .with_push_notifications(false); println!("Capabilities: {:?}", caps); }
Run it:
cargo run
If this compiles and runs, you're ready to go.
Next Steps
- Quick Start — Run the echo agent example in 5 minutes
- Your First Agent — Build an agent from scratch
Quick Start
This guide gets you running the built-in echo agent example in under 5 minutes. The example demonstrates the full A2A stack: agent executor, dual-transport server, and client — all in a single binary.
Running the Echo Agent
Clone the repository and run the example:
git clone https://github.com/tomtom215/a2a-rust.git
cd a2a-rust
cargo run -p echo-agent
You'll see output like:
=== A2A Echo Agent Example ===
JSON-RPC server listening on http://127.0.0.1:54321
REST server listening on http://127.0.0.1:54322
--- Demo 1: Synchronous SendMessage (JSON-RPC) ---
Task ID: 550e8400-e29b-41d4-a716-446655440000
Status: Completed
Artifact: echo-artifact
Content: Echo: Hello from JSON-RPC client!
--- Demo 2: Streaming SendMessage (JSON-RPC) ---
Status update: Working
Artifact update: echo-artifact
Content: Echo: Hello from streaming client!
Status update: Completed
--- Demo 3: Synchronous SendMessage (REST) ---
Task ID: ...
Status: Completed
Content: Echo: Hello from REST client!
--- Demo 4: Streaming SendMessage (REST) ---
Status update: Working
Content: Echo: Hello from REST streaming!
Status update: Completed
--- Demo 5: GetTask ---
Fetched task: ... (Completed)
=== All demos completed successfully! ===
What Just Happened?
The example exercised all major protocol operations:
- Synchronous send (JSON-RPC) — Client sends a message, waits for the complete task
- Streaming send (JSON-RPC) — Client receives real-time SSE events as the agent works
- Synchronous send (REST) — Same operation over the REST transport
- Streaming send (REST) — SSE streaming over REST
- GetTask — Retrieves a previously completed task by ID
The Code in Brief
The echo agent is about 100 lines of Rust. Here's the core executor:
#![allow(unused)] fn main() { use a2a_protocol_sdk::prelude::*; use std::future::Future; use std::pin::Pin; struct EchoExecutor; impl AgentExecutor for EchoExecutor { fn execute<'a>( &'a self, ctx: &'a RequestContext, queue: &'a dyn EventQueueWriter, ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> { Box::pin(async move { // 1. Transition to Working queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), status: TaskStatus::new(TaskState::Working), metadata: None, })).await?; // 2. Extract text from incoming message let input = ctx.message.parts.iter() .find_map(|p| match &p.content { a2a_protocol_types::message::PartContent::Text { text } => Some(text.as_str()), _ => None, }) .unwrap_or("<no text>"); // 3. Echo back as an artifact queue.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), artifact: Artifact::new("echo-artifact", vec![Part::text( &format!("Echo: {input}") )]), append: None, last_chunk: Some(true), metadata: None, })).await?; // 4. Transition to Completed queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), status: TaskStatus::new(TaskState::Completed), metadata: None, })).await?; Ok(()) }) } } }
The pattern is always: write status updates and artifacts to the event queue, then return Ok(()).
With Tracing
Enable structured logging to see the protocol internals:
cargo run -p echo-agent --features tracing
RUST_LOG=debug cargo run -p echo-agent --features tracing
Next Steps
- Your First Agent — Build your own agent from scratch
- Project Structure — Understand how the crates fit together
- The AgentExecutor Trait — Deep dive into the executor API
Your First Agent
This guide walks you through building a complete A2A agent from scratch — a calculator that evaluates simple arithmetic expressions.
Project Setup
Create a new binary crate:
cargo new my-agent
cd my-agent
Add dependencies to Cargo.toml:
[dependencies]
a2a-protocol-sdk = "0.2"
tokio = { version = "1", features = ["full"] }
uuid = { version = "1", features = ["v4"] }
Step 1: Define Your Executor
The AgentExecutor trait is the entry point for all agent logic. Implement it to define what your agent does when it receives a message:
#![allow(unused)] fn main() { use a2a_protocol_sdk::prelude::*; use a2a_protocol_sdk::server::RequestContext; use std::future::Future; use std::pin::Pin; struct CalcExecutor; impl AgentExecutor for CalcExecutor { fn execute<'a>( &'a self, ctx: &'a RequestContext, queue: &'a dyn EventQueueWriter, ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> { Box::pin(async move { // Signal that we're working queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), status: TaskStatus::new(TaskState::Working), metadata: None, })).await?; // Extract the expression from the message let expr = ctx.message.parts.iter() .find_map(|p| match &p.content { a2a_protocol_types::message::PartContent::Text { text } => Some(text.clone()), _ => None, }) .unwrap_or_default(); // Evaluate (very basic: just handle "a + b") let result = evaluate(&expr); // Send the result as an artifact queue.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), artifact: Artifact::new( "result", vec![Part::text(&result)], ), append: None, last_chunk: Some(true), metadata: None, })).await?; // Done queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), status: TaskStatus::new(TaskState::Completed), metadata: None, })).await?; Ok(()) }) } } fn evaluate(expr: &str) -> String { // Toy parser: "3 + 5", "10 - 2", etc. let parts: Vec<&str> = expr.split_whitespace().collect(); if parts.len() != 3 { return format!("Error: expected 'a op b', got '{expr}'"); } let a: f64 = match parts[0].parse() { Ok(v) => v, Err(_) => return format!("Error: invalid number '{}'", parts[0]), }; let b: f64 = match parts[2].parse() { Ok(v) => v, Err(_) => return format!("Error: invalid number '{}'", parts[2]), }; match parts[1] { "+" => format!("{}", a + b), "-" => format!("{}", a - b), "*" => format!("{}", a * b), "/" if b != 0.0 => format!("{}", a / b), "/" => "Error: division by zero".into(), op => format!("Error: unknown operator '{op}'"), } } }
Step 2: Create the Agent Card
The agent card tells clients what your agent can do:
#![allow(unused)] fn main() { use a2a_protocol_sdk::types::agent_card::*; fn make_agent_card(url: &str) -> AgentCard { AgentCard { name: "Calculator Agent".into(), description: "Evaluates simple arithmetic expressions".into(), version: "1.0.0".into(), supported_interfaces: vec![AgentInterface { url: url.into(), protocol_binding: "JSONRPC".into(), protocol_version: "1.0.0".into(), tenant: None, }], default_input_modes: vec!["text/plain".into()], default_output_modes: vec!["text/plain".into()], skills: vec![AgentSkill { id: "calc".into(), name: "Calculator".into(), description: "Evaluates expressions like '3 + 5'".into(), tags: vec!["math".into(), "calculator".into()], examples: Some(vec![ "3 + 5".into(), "10 * 2".into(), "100 / 4".into(), ]), input_modes: None, output_modes: None, security_requirements: None, }], capabilities: AgentCapabilities::none() .with_streaming(true) .with_push_notifications(false), provider: None, icon_url: None, documentation_url: None, security_schemes: None, security_requirements: None, signatures: None, } } }
Step 3: Wire Up the Server
Build the request handler and start an HTTP server:
use a2a_protocol_sdk::server::{RequestHandlerBuilder, JsonRpcDispatcher}; use std::sync::Arc; #[tokio::main] async fn main() { // Build the handler with our executor and agent card let handler = Arc::new( RequestHandlerBuilder::new(CalcExecutor) .with_agent_card(make_agent_card("http://localhost:3000")) .build() .expect("build handler"), ); // Create the JSON-RPC dispatcher let dispatcher = Arc::new(JsonRpcDispatcher::new(handler)); // Start the HTTP server let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") .await .expect("bind"); println!("Calculator agent listening on http://127.0.0.1:3000"); loop { let (stream, _) = listener.accept().await.expect("accept"); let io = hyper_util::rt::TokioIo::new(stream); let dispatcher = Arc::clone(&dispatcher); tokio::spawn(async move { let service = hyper::service::service_fn(move |req| { let d = Arc::clone(&dispatcher); async move { Ok::<_, std::convert::Infallible>(d.dispatch(req).await) } }); let _ = hyper_util::server::conn::auto::Builder::new( hyper_util::rt::TokioExecutor::new(), ) .serve_connection(io, service) .await; }); } }
Step 4: Test with a Client
In a separate terminal (or in the same binary), create a client:
use a2a_protocol_sdk::prelude::*; use a2a_protocol_sdk::client::ClientBuilder; #[tokio::main] async fn main() { let client = ClientBuilder::new("http://127.0.0.1:3000".to_string()) .build() .expect("build client"); let params = MessageSendParams { tenant: None, message: Message { id: MessageId::new(uuid::Uuid::new_v4().to_string()), role: MessageRole::User, parts: vec![Part::text("42 + 58")], task_id: None, context_id: None, reference_task_ids: None, extensions: None, metadata: None, }, configuration: None, metadata: None, }; match client.send_message(params).await.unwrap() { SendMessageResponse::Task(task) => { println!("Result: {:?}", task.status.state); if let Some(artifacts) = &task.artifacts { for art in artifacts { for part in &art.parts { if let a2a_protocol_types::message::PartContent::Text { text } = &part.content { println!("Answer: {text}"); } } } } } other => println!("Unexpected response: {other:?}"), } }
Output:
Result: Completed
Answer: 100
The Three-Event Pattern
Almost every executor follows this pattern:
- Status → Working — Signal that processing has started
- ArtifactUpdate — Deliver results (one or more artifacts)
- Status → Completed — Signal that processing is done
For streaming clients, these arrive as individual SSE events. For synchronous clients, the handler collects them into a final Task response.
Next Steps
- Project Structure — Understand how the crates fit together
- The AgentExecutor Trait — Advanced executor patterns
- Request Handler & Builder — Configuration options
Project Structure
a2a-rust is organized as a Cargo workspace with four crates, each with a clear responsibility. Understanding this structure helps you choose the right dependency for your use case.
Workspace Layout
a2a-rust/
├── Cargo.toml # Workspace root
├── crates/
│ ├── a2a-protocol-types/ # Wire types (serde, no I/O)
│ │ └── src/
│ │ ├── lib.rs
│ │ ├── task.rs # Task, TaskState, TaskStatus, TaskId
│ │ ├── message.rs # Message, Part, PartContent, MessageRole
│ │ ├── artifact.rs # Artifact, ArtifactId
│ │ ├── agent_card.rs # AgentCard, AgentInterface, AgentSkill
│ │ ├── events.rs # StreamResponse, status/artifact events
│ │ ├── params.rs # MessageSendParams, TaskQueryParams, ...
│ │ ├── responses.rs # SendMessageResponse, TaskListResponse
│ │ ├── jsonrpc.rs # JSON-RPC 2.0 envelope types
│ │ ├── push.rs # Push notification config types
│ │ ├── error.rs # A2aError, ErrorCode, A2aResult
│ │ ├── security.rs # Security schemes and requirements
│ │ └── extensions.rs # Extension and signing types
│ │
│ ├── a2a-protocol-client/ # HTTP client
│ │ └── src/
│ │ ├── lib.rs # A2aClient, ClientBuilder
│ │ ├── builder/ # ClientBuilder (fluent config)
│ │ │ ├── mod.rs # Builder struct, configuration setters
│ │ │ └── transport_factory.rs # build() / build_grpc() assembly
│ │ ├── transport/ # Transport trait + implementations
│ │ │ ├── mod.rs # Transport trait, truncate_body
│ │ │ ├── rest/ # REST transport
│ │ │ │ ├── mod.rs # RestTransport struct, constructors
│ │ │ │ ├── request.rs # URI/request building, execution
│ │ │ │ ├── streaming.rs # SSE streaming, body reader
│ │ │ │ ├── routing.rs # Route definitions, method mapping
│ │ │ │ └── query.rs # Query string building, encoding
│ │ │ └── jsonrpc.rs # JsonRpcTransport
│ │ ├── streaming/ # SSE parser, EventStream
│ │ │ ├── event_stream.rs # EventStream for consuming SSE
│ │ │ └── sse_parser/ # SSE frame parser
│ │ │ ├── mod.rs # Re-exports
│ │ │ ├── types.rs # SseFrame, SseParseError
│ │ │ └── parser.rs # SseParser state machine
│ │ ├── methods/ # send_message, tasks, push_config
│ │ ├── auth.rs # CredentialsStore, AuthInterceptor
│ │ └── interceptor.rs # CallInterceptor, InterceptorChain
│ │
│ ├── a2a-protocol-server/ # Server framework
│ │ └── src/
│ │ ├── lib.rs # Public re-exports
│ │ ├── handler/ # RequestHandler (core orchestration)
│ │ │ ├── mod.rs # Struct definition, SendMessageResult
│ │ │ ├── limits.rs # HandlerLimits config
│ │ │ ├── messaging.rs # SendMessage / SendStreamingMessage
│ │ │ ├── lifecycle/ # Task lifecycle handlers
│ │ │ │ ├── mod.rs # Re-exports
│ │ │ │ ├── get_task.rs # GetTask handler
│ │ │ │ ├── list_tasks.rs # ListTasks handler
│ │ │ │ ├── cancel_task.rs # CancelTask handler
│ │ │ │ ├── subscribe.rs # SubscribeToTask handler
│ │ │ │ └── extended_card.rs # GetExtendedAgentCard handler
│ │ │ ├── push_config.rs # Push notification config CRUD
│ │ │ ├── event_processing/ # Event collection & push delivery
│ │ │ │ ├── mod.rs # Re-exports
│ │ │ │ ├── sync_collector.rs # Sync-mode event collection
│ │ │ │ └── background/ # Background event processor
│ │ │ │ ├── mod.rs # Event loop orchestration
│ │ │ │ ├── state_machine.rs # Event dispatch, state transitions
│ │ │ │ └── push_delivery.rs # Push notification delivery
│ │ │ ├── shutdown.rs # Graceful shutdown
│ │ │ └── helpers.rs # Validation, context builders
│ │ ├── builder.rs # RequestHandlerBuilder
│ │ ├── executor.rs # AgentExecutor trait
│ │ ├── executor_helpers.rs # boxed_future, agent_executor!, EventEmitter
│ │ ├── dispatch/ # Protocol dispatchers
│ │ │ ├── mod.rs # DispatchConfig, re-exports
│ │ │ ├── rest/ # REST dispatcher
│ │ │ │ ├── mod.rs # RestDispatcher, route handlers
│ │ │ │ ├── response.rs # HTTP response helpers
│ │ │ │ └── query.rs # Query/URL parsing utilities
│ │ │ ├── jsonrpc/ # JSON-RPC 2.0 dispatcher
│ │ │ │ ├── mod.rs # JsonRpcDispatcher, dispatch logic
│ │ │ │ └── response.rs # JSON-RPC response serialization
│ │ │ └── grpc/ # gRPC dispatcher (feature-gated)
│ │ │ ├── mod.rs # Proto includes, re-exports
│ │ │ ├── config.rs # GrpcConfig
│ │ │ ├── dispatcher.rs # GrpcDispatcher, server setup
│ │ │ ├── service.rs # A2aService trait implementation
│ │ │ └── helpers.rs # JSON codec, error mapping
│ │ ├── store/ # Task persistence
│ │ │ ├── mod.rs # Re-exports
│ │ │ ├── task_store/ # TaskStore trait + in-memory impl
│ │ │ │ ├── mod.rs # TaskStore trait, TaskStoreConfig
│ │ │ │ └── in_memory/ # InMemoryTaskStore
│ │ │ │ ├── mod.rs # Core CRUD, TaskStore impl
│ │ │ │ └── eviction.rs # TTL + capacity eviction
│ │ │ └── tenant/ # Multi-tenant isolation
│ │ │ ├── mod.rs # Re-exports
│ │ │ ├── context.rs # TenantContext (task-local)
│ │ │ └── store.rs # TenantAwareInMemoryTaskStore
│ │ ├── push/ # PushConfigStore, PushSender
│ │ ├── streaming/ # Event streaming
│ │ │ ├── mod.rs # Re-exports
│ │ │ ├── sse.rs # SSE response builder
│ │ │ └── event_queue/ # Event queue system
│ │ │ ├── mod.rs # Traits, constants, constructors
│ │ │ ├── in_memory.rs # Broadcast-backed queue impl
│ │ │ └── manager.rs # EventQueueManager
│ │ ├── agent_card/ # Static/Dynamic card handlers
│ │ ├── call_context.rs # CallContext with HTTP headers
│ │ ├── metrics.rs # Metrics trait
│ │ ├── rate_limit.rs # RateLimitInterceptor, RateLimitConfig
│ │ ├── serve.rs # serve(), serve_with_addr() helpers
│ │ └── request_context.rs # RequestContext
│ │
│ └── a2a-protocol-sdk/ # Umbrella crate
│ └── src/
│ └── lib.rs # Re-exports + prelude
│
├── examples/
│ └── echo-agent/ # Working example agent
│
├── docs/
│ └── adr/ # Architecture Decision Records
│
└── fuzz/ # Fuzz testing targets
Crate Dependencies
a2a-protocol-sdk
├── a2a-protocol-client
│ └── a2a-protocol-types
├── a2a-protocol-server
│ └── a2a-protocol-types
└── a2a-protocol-types
The dependency graph is intentionally shallow:
a2a-protocol-typeshas no internal dependencies — justserdeandserde_jsona2a-protocol-clientdepends ona2a-protocol-typesplus HTTP crates (hyper,hyper-util,http-body-util)a2a-protocol-serverdepends ona2a-protocol-typesplus the same HTTP stacka2a-protocol-sdkdepends on all three, adding nothing of its own
Choosing Your Dependency
| Use Case | Crate |
|---|---|
| Just want the types (e.g., for a custom transport) | a2a-protocol-types |
| Building a client that calls remote agents | a2a-protocol-client |
| Building an agent (server) | a2a-protocol-server |
| Building both client and server | a2a-protocol-sdk |
| Quick prototyping / examples | a2a-protocol-sdk (use the prelude) |
The Prelude
The SDK's prelude module exports the most commonly used types so you can get started with a single import:
#![allow(unused)] fn main() { use a2a_protocol_sdk::prelude::*; }
This gives you:
- Core types:
Task,TaskState,TaskStatus,Message,Part,Artifact,ArtifactId - ID types:
TaskId,ContextId,MessageId,MessageRole - Events:
StreamResponse,TaskStatusUpdateEvent,TaskArtifactUpdateEvent - Agent card:
AgentCard,AgentInterface,AgentCapabilities,AgentSkill - Params:
MessageSendParams,TaskQueryParams - Responses:
SendMessageResponse,TaskListResponse - Errors:
A2aError,A2aResult,ClientError,ClientResult,ServerError,ServerResult - Client:
A2aClient,ClientBuilder,EventStream,RetryPolicy - Server:
AgentExecutor,RequestHandler,RequestHandlerBuilder,RequestContext,EventQueueWriter,EventEmitter,Dispatcher - Dispatchers:
JsonRpcDispatcher,RestDispatcher - Utilities:
serve,serve_with_addr,RateLimitInterceptor,RateLimitConfig
External Dependencies
a2a-rust keeps its dependency tree lean:
| Dependency | Used For |
|---|---|
serde / serde_json | JSON serialization |
hyper / hyper-util | HTTP/1.1 and HTTP/2 |
http-body-util | HTTP body utilities |
tokio | Async runtime |
uuid | Task and message ID generation |
bytes | Efficient byte buffers |
No web framework (axum, actix, warp) is required — the dispatchers work directly with hyper.
Next Steps
- Protocol Overview — Understand the A2A protocol model
- The AgentExecutor Trait — The core of agent implementation
Protocol Overview
The A2A (Agent-to-Agent) protocol defines how AI agents discover each other, exchange messages, manage task lifecycles, and stream results. This page covers the conceptual model — the "what" before the "how."
The Big Picture
An A2A interaction follows this flow:
Client Agent Agent (Server)
│ │
│ 1. GET /agent.json │
│ ─────────────────────────►│
│ AgentCard │
│ ◄─────────────────────────│
│ │
│ 2. SendMessage │
│ ─────────────────────────►│
│ Task (or SSE stream) │
│ ◄─────────────────────────│
│ │
│ 3. GetTask │
│ ─────────────────────────►│
│ Task │
│ ◄─────────────────────────│
│ │
- Discovery — The client fetches the agent's card from
/.well-known/agent.json - Communication — The client sends a message and receives results
- Management — The client can query, cancel, or subscribe to tasks
Core Entities
Tasks
A Task is the central unit of work. When a client sends a message, the server creates a task that progresses through well-defined states:
┌───────────┐
┌──────│ Submitted │──────┐
│ └─────┬─────┘ │
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌──────────┐
┌───►│Working │ │Failed *│ │Rejected *│
│ └───┬────┘ └────────┘ └──────────┘
│ │
│ ┌────┼──────────┬───────────┐
│ │ │ │ │
│ ▼ ▼ ▼ ▼
│ ┌───────────┐ ┌─────────┐ ┌──────────┐
│ │Completed *│ │ Input │ │ Auth │
│ └───────────┘ │Required │ │ Required │
│ └────┬────┘ └────┬─────┘
│ │ │
└────────────────────┴───────────┘
(InputRequired and AuthRequired
also transition to Failed/Canceled)
* = terminal state (no further transitions)
Terminal states (Completed, Failed, Canceled, Rejected) are final — no further transitions are allowed.
Messages
A Message is a structured payload sent between agents. Each message has:
- A unique ID (
MessageId) - A role —
User(from the client) orAgent(from the server) - One or more Parts — the actual content
Parts
A Part is a content unit within a message. Three content types are supported:
| Type | Description | Example |
|---|---|---|
Text | Plain text | "Summarize this document" |
File | Inline bytes or URI reference | Image data, "https://example.com/doc.pdf" |
Data | Structured JSON | {"table": [...], "columns": [...]} |
Artifacts
An Artifact is a result produced by an agent. Like messages, artifacts contain parts. Unlike messages, artifacts belong to a task and can be delivered incrementally via streaming.
Agent Cards
An Agent Card is the discovery document that describes an agent — its name, capabilities, skills, and how to connect. Think of it as a machine-readable business card.
Request/Response Model
A2A supports two communication styles:
Synchronous (SendMessage)
The client sends a message and blocks until the task is complete:
Client Server
│ │
│ SendMessage │
│ ───────────────────────────►│
│ Executor runs,
│ collects events
│ Task │
│ ◄───────────────────────────│
│ │
Streaming (SendStreamingMessage)
The client sends a message and receives events in real time via SSE:
Client Server
│ │
│ SendStreamingMessage │
│ ───────────────────────────►│
│ StatusUpdate: Working │
│ ◄───────────────────────────│
│ ArtifactUpdate │
│ ◄───────────────────────────│
│ ArtifactUpdate │
│ ◄───────────────────────────│
│ StatusUpdate: Completed │
│ ◄───────────────────────────│
│ │
Streaming is ideal for long-running tasks where the client wants progress updates.
Contexts and Conversations
Tasks exist within a Context — a conversation thread. Multiple tasks can share the same context, allowing agents to maintain conversational state across interactions.
When a client sends a message with a context_id, the server groups that task with previous tasks in the same context. If no context_id is provided, the server creates a new one.
Multi-Tenancy
A2A supports multi-tenancy via an optional tenant field on all requests. This allows a single agent server to serve multiple isolated tenants, each with their own tasks and configurations.
In the REST transport, tenancy is expressed as a path prefix: /tenants/{tenant-id}/tasks/...
Next Steps
- Transport Layers — JSON-RPC vs REST, and when to use each
- Agent Cards & Discovery — How agents describe themselves
- Tasks & Messages — Deep dive into the data model
Transport Layers
A2A supports four transport bindings: JSON-RPC 2.0, REST, WebSocket (websocket feature flag), and gRPC (grpc feature flag). All four are first-class citizens in a2a-rust — the server can serve multiple transports simultaneously, and the client auto-selects based on the agent card.
JSON-RPC 2.0
The JSON-RPC transport sends all requests to a single endpoint as POST requests with a JSON-RPC 2.0 envelope:
{
"jsonrpc": "2.0",
"method": "SendMessage",
"id": "req-1",
"params": {
"message": {
"messageId": "msg-1",
"role": "ROLE_USER",
"parts": [{"text": "Hello, agent!"}]
}
}
}
Response:
{
"jsonrpc": "2.0",
"id": "req-1",
"result": {
"id": "task-abc",
"contextId": "ctx-123",
"status": { "state": "TASK_STATE_COMPLETED" },
"artifacts": [...]
}
}
Method Names
| A2A Operation | JSON-RPC Method |
|---|---|
| Send message | SendMessage |
| Stream message | SendStreamingMessage |
| Get task | GetTask |
| List tasks | ListTasks |
| Cancel task | CancelTask |
| Subscribe to task | SubscribeToTask |
| Create push config | CreateTaskPushNotificationConfig |
| Get push config | GetTaskPushNotificationConfig |
| List push configs | ListTaskPushNotificationConfigs |
| Delete push config | DeleteTaskPushNotificationConfig |
| Extended card | GetExtendedAgentCard |
Batching
JSON-RPC supports batch requests — multiple operations in a single HTTP request:
[
{"jsonrpc": "2.0", "method": "GetTask", "id": "1", "params": {"id": "task-a"}},
{"jsonrpc": "2.0", "method": "GetTask", "id": "2", "params": {"id": "task-b"}}
]
Note: Streaming methods (
SendStreamingMessage,SubscribeToTask) cannot be used in batch requests and will return an error.
ID Handling
JSON-RPC request IDs can be strings, numbers (including 0 and floats), or null. The server preserves the exact ID type in the response.
REST
The REST transport uses standard HTTP methods and URL paths:
| Operation | Method | Path |
|---|---|---|
| Send message | POST | /message:send |
| Stream message | POST | /message:stream |
| Get task | GET | /tasks/{id} |
| List tasks | GET | /tasks |
| Cancel task | POST | /tasks/{id}:cancel |
| Subscribe | GET | /tasks/{id}:subscribe |
| Create push config | POST | /tasks/{id}/pushNotificationConfigs |
| Get push config | GET | /tasks/{id}/pushNotificationConfigs/{configId} |
| List push configs | GET | /tasks/{id}/pushNotificationConfigs |
| Delete push config | DELETE | /tasks/{id}/pushNotificationConfigs/{configId} |
| Agent card | GET | /.well-known/agent.json |
Multi-Tenant Paths
With tenancy, paths are prefixed: /tenants/{tenant-id}/tasks/{id}
Content Types
The REST dispatcher accepts both application/json and application/a2a+json.
Security
The REST dispatcher includes built-in protections:
- Path traversal rejection —
..in path segments (including percent-encoded%2E%2E) returns 400 - Query string limits — Query strings over 4 KiB return 414
- Body size limits — Request bodies over 4 MiB return 413
WebSocket
The WebSocket transport (websocket feature flag) provides a persistent bidirectional channel over a single TCP connection. JSON-RPC 2.0 messages are exchanged as WebSocket text frames.
# Server
a2a-protocol-server = { version = "0.2", features = ["websocket"] }
# Client
a2a-protocol-client = { version = "0.2", features = ["websocket"] }
Server
#![allow(unused)] fn main() { use a2a_protocol_server::{WebSocketDispatcher, RequestHandlerBuilder}; use std::sync::Arc; let handler = Arc::new(RequestHandlerBuilder::new(my_executor).build().unwrap()); let dispatcher = Arc::new(WebSocketDispatcher::new(handler)); // Start accepting WebSocket connections dispatcher.serve("0.0.0.0:3002").await?; }
Protocol
- Client sends JSON-RPC 2.0 requests as text frames
- Server responds with JSON-RPC 2.0 responses as text frames
- For streaming methods (
SendStreamingMessage,SubscribeToTask), the server sends multiple frames — one per event — followed by astream_completeresponse - Ping/pong frames are handled automatically
- Connection closes cleanly on WebSocket close frame
Client
#![allow(unused)] fn main() { use a2a_protocol_client::WebSocketTransport; let transport = WebSocketTransport::connect("ws://agent.example.com:3002").await?; let client = ClientBuilder::new("ws://agent.example.com:3002") .with_custom_transport(transport) .build()?; }
When to Use WebSocket
- Long-lived connections — Avoids TCP/TLS handshake overhead per request
- Bidirectional streaming — Server can push events without SSE
- Low latency — No HTTP framing overhead for small messages
gRPC
The gRPC transport (grpc feature flag) provides high-performance RPC via protocol buffers and HTTP/2. JSON payloads are carried inside protobuf bytes fields, reusing all existing serde types — no duplicate type definitions.
# Server
a2a-protocol-server = { version = "0.2", features = ["grpc"] }
# Client
a2a-protocol-client = { version = "0.2", features = ["grpc"] }
Server
#![allow(unused)] fn main() { use a2a_protocol_server::{GrpcDispatcher, GrpcConfig}; use std::sync::Arc; let handler = Arc::new(RequestHandlerBuilder::new(my_executor).build().unwrap()); let config = GrpcConfig::default() .with_max_message_size(8 * 1024 * 1024); let dispatcher = GrpcDispatcher::new(handler, config); dispatcher.serve("0.0.0.0:50051").await?; }
Tip: Use
serve_with_listener()when you need to know the server address before constructing the handler (e.g., for agent cards with correct URLs). Pre-bind aTcpListener, extract the address, build your handler, then pass the listener.
Client
#![allow(unused)] fn main() { use a2a_protocol_client::GrpcTransport; let transport = GrpcTransport::connect("http://agent.example.com:50051").await?; let client = ClientBuilder::new("http://agent.example.com:50051") .with_custom_transport(transport) .build()?; }
Protocol
- All 11 A2A methods are mapped to gRPC RPCs
- Streaming methods (
SendStreamingMessage,SubscribeToTask) use gRPC server streaming - JSON payloads are wrapped in
JsonPayload { bytes data = 1 }protobuf messages - The proto definition is at
proto/a2a.proto
When to Use gRPC
- Service mesh integration — gRPC is native to Kubernetes, Istio, Envoy
- Language interop — gRPC has code generation for 10+ languages
- HTTP/2 multiplexing — Multiple RPCs over a single connection
- Streaming — Native server streaming without SSE
Choosing a Transport
| Factor | JSON-RPC | REST | WebSocket | gRPC |
|---|---|---|---|---|
| Batch operations | Supported | Not supported | Not supported | Not supported |
| Caching | Limited (POST-only) | HTTP cache-friendly (GET) | Not applicable | Not applicable |
| Tooling | Needs JSON-RPC client | Standard HTTP tools work | WebSocket client needed | gRPC client needed |
| URL structure | Single endpoint | Resource-oriented | Single connection | Single connection |
| Streaming | SSE via POST | SSE via POST/GET | Native text frames | Native server streaming |
| Connection reuse | HTTP keep-alive | HTTP keep-alive | Persistent connection | HTTP/2 multiplexing |
JSON-RPC and REST use SSE for streaming. WebSocket uses native text frames. gRPC uses native server streaming over HTTP/2. The choice is mostly about ecosystem fit — JSON-RPC for agent-to-agent communication, REST for standard HTTP tooling, WebSocket for persistent low-latency connections, gRPC for service mesh and cross-language interop.
Running Both Transports
The server can serve both transports simultaneously on different ports:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::{JsonRpcDispatcher, RestDispatcher, RequestHandlerBuilder}; use std::sync::Arc; let handler = Arc::new( RequestHandlerBuilder::new(my_executor).build().unwrap() ); // JSON-RPC on port 3000 let jsonrpc = Arc::new(JsonRpcDispatcher::new(Arc::clone(&handler))); // REST on port 3001 let rest = Arc::new(RestDispatcher::new(handler)); }
All dispatchers share the same RequestHandler, which means they share the same task store, push config store, and executor.
Next Steps
- Agent Cards & Discovery — How transport URLs are advertised
- Streaming with SSE — How real-time events work across transports
- Dispatchers — Server-side dispatcher configuration
Agent Cards & Discovery
An Agent Card is the machine-readable discovery document that describes an A2A agent. Clients fetch the card to learn what the agent can do, how to connect, and what security is required.
The Agent Card
Agent cards are served at /.well-known/agent.json and contain:
{
"name": "Calculator Agent",
"description": "Evaluates arithmetic expressions",
"version": "1.0.0",
"supportedInterfaces": [
{
"url": "https://agent.example.com/rpc",
"protocolBinding": "JSONRPC",
"protocolVersion": "1.0.0"
},
{
"url": "https://agent.example.com/api",
"protocolBinding": "REST",
"protocolVersion": "1.0.0"
}
],
"defaultInputModes": ["text/plain"],
"defaultOutputModes": ["text/plain"],
"skills": [
{
"id": "calc",
"name": "Calculator",
"description": "Evaluates expressions like '3 + 5'",
"tags": ["math", "calculator"],
"examples": ["3 + 5", "10 * 2"]
}
],
"capabilities": {
"streaming": true,
"pushNotifications": false
}
}
Required Fields
| Field | Description |
|---|---|
name | Human-readable agent name |
description | What the agent does |
version | Semantic version of this agent |
supportedInterfaces | At least one interface with URL and protocol binding |
defaultInputModes | MIME types accepted (e.g., ["text/plain"]) |
defaultOutputModes | MIME types produced |
skills | At least one skill describing a capability |
capabilities | Capability flags (streaming, push, etc.) |
Skills
Skills describe discrete capabilities:
#![allow(unused)] fn main() { use a2a_protocol_sdk::types::agent_card::AgentSkill; let skill = AgentSkill { id: "summarize".into(), name: "Summarizer".into(), description: "Summarizes long documents".into(), tags: vec!["nlp".into(), "summarization".into()], examples: Some(vec![ "Summarize this research paper".into(), "Give me a 3-sentence summary".into(), ]), input_modes: Some(vec!["text/plain".into(), "application/pdf".into()]), output_modes: Some(vec!["text/plain".into()]), security_requirements: None, }; }
Skills can override the agent's default input/output modes and declare their own security requirements.
Capabilities
The AgentCapabilities struct advertises what the agent supports:
#![allow(unused)] fn main() { use a2a_protocol_sdk::prelude::AgentCapabilities; let caps = AgentCapabilities::none() .with_streaming(true) // Supports SendStreamingMessage .with_push_notifications(true) // Supports push notification configs .with_extended_agent_card(true); // Has an authenticated extended card }
Note:
AgentCapabilitiesis#[non_exhaustive]— always construct it viaAgentCapabilities::none()and the builder methods, never with a struct literal.
Interfaces
Each interface describes a transport endpoint:
#![allow(unused)] fn main() { use a2a_protocol_sdk::types::agent_card::AgentInterface; let interface = AgentInterface { url: "https://agent.example.com/rpc".into(), protocol_binding: "JSONRPC".into(), // or "REST", "GRPC" protocol_version: "1.0.0".into(), tenant: None, // Optional: fixed tenant for this interface }; }
An agent must have at least one interface. Having multiple interfaces (e.g., JSON-RPC and REST) lets clients choose their preferred transport.
Serving Agent Cards
Static Handler
For agent cards that don't change at runtime:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::RequestHandlerBuilder; let handler = RequestHandlerBuilder::new(my_executor) .with_agent_card(make_agent_card()) .build() .unwrap(); }
The static handler automatically provides:
- ETag headers for cache validation
- Last-Modified timestamps
- Cache-Control directives
- 304 Not Modified responses for conditional requests
Dynamic Handler
For agent cards that change (e.g., based on feature flags, load, or authentication):
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::{AgentCardProducer, DynamicAgentCardHandler}; use a2a_protocol_sdk::types::agent_card::AgentCard; struct MyCardProducer; impl AgentCardProducer for MyCardProducer { fn produce<'a>(&'a self) -> Pin<Box<dyn Future<Output = A2aResult<AgentCard>> + Send + 'a>> { Box::pin(async move { // Generate card dynamically Ok(make_agent_card()) }) } } }
The dynamic handler calls the producer on every request, computes a fresh ETag, and handles conditional caching.
Hot-Reload Handler
For agent cards loaded from a JSON file that may change at runtime:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::HotReloadAgentCardHandler; use std::path::Path; use std::time::Duration; let handler = HotReloadAgentCardHandler::new(initial_card); // Cross-platform: poll the file every 30 seconds let watcher = handler.spawn_poll_watcher( Path::new("/etc/a2a/agent.json"), Duration::from_secs(30), ); // Unix only: reload on SIGHUP #[cfg(unix)] let signal_watcher = handler.spawn_signal_watcher( Path::new("/etc/a2a/agent.json"), ); }
HotReloadAgentCardHandler implements AgentCardProducer, so it plugs directly into DynamicAgentCardHandler for full HTTP caching support. The internal Arc<RwLock<AgentCard>> ensures updates are atomic and lock-free for readers.
HTTP Caching
Agent card responses include standard HTTP caching headers (RFC 7232):
| Header | Purpose |
|---|---|
ETag | Content hash for cache validation |
Last-Modified | Timestamp of last change |
Cache-Control | public, max-age=60 (configurable) |
Clients should send If-None-Match or If-Modified-Since headers. If the card hasn't changed, the server returns 304 Not Modified with no body.
Security
Agent cards can declare security requirements using OpenAPI-style schemes:
{
"securitySchemes": {
"bearer": {
"type": "http",
"scheme": "bearer"
}
},
"securityRequirements": [
{ "bearer": [] }
]
}
Individual skills can also declare their own security requirements, overriding the global ones.
Extended Agent Cards
If the agent supports extendedAgentCard capability, clients can fetch an authenticated version with additional details via the GetExtendedAgentCard method.
Next Steps
- Tasks & Messages — The data model for agent communication
- Streaming with SSE — Real-time event delivery
Tasks & Messages
Tasks and messages are the core data model of the A2A protocol. Understanding their structure and lifecycle is essential for building agents.
Tasks
A Task represents a unit of work. Every SendMessage call creates one.
Task Structure
#![allow(unused)] fn main() { pub struct Task { pub id: TaskId, // Server-assigned unique ID pub context_id: ContextId, // Conversation thread ID pub status: TaskStatus, // Current state + optional message pub history: Option<Vec<Message>>, // Previous messages in context pub artifacts: Option<Vec<Artifact>>, // Produced results pub metadata: Option<serde_json::Value>, // Arbitrary key-value data } }
Task States
Tasks follow a state machine with validated transitions:
| State | Meaning | Terminal? |
|---|---|---|
Submitted | Received, not yet started | No |
Working | Actively processing | No |
InputRequired | Needs more input from client | No |
AuthRequired | Needs authentication | No |
Completed | Finished successfully | Yes |
Failed | Finished with error | Yes |
Canceled | Canceled by client | Yes |
Rejected | Rejected before execution | Yes |
Valid Transitions
Not all state transitions are allowed. The library enforces these rules:
#![allow(unused)] fn main() { use a2a_protocol_sdk::prelude::TaskState; // Check if a transition is valid assert!(TaskState::Submitted.can_transition_to(TaskState::Working)); assert!(TaskState::Working.can_transition_to(TaskState::Completed)); // Terminal states cannot transition assert!(!TaskState::Completed.can_transition_to(TaskState::Working)); assert!(!TaskState::Failed.can_transition_to(TaskState::Working)); // Check if a state is terminal assert!(TaskState::Completed.is_terminal()); assert!(!TaskState::Working.is_terminal()); }
Task Status
The status combines a state with an optional message and timestamp:
#![allow(unused)] fn main() { use a2a_protocol_sdk::prelude::{TaskStatus, TaskState}; // Without timestamp let status = TaskStatus::new(TaskState::Working); // With automatic UTC timestamp let status = TaskStatus::with_timestamp(TaskState::Completed); }
Wire Format
On the wire, task states use the TASK_STATE_ prefix:
{
"id": "task-abc",
"contextId": "ctx-123",
"status": {
"state": "TASK_STATE_COMPLETED",
"timestamp": "2026-03-15T10:30:00Z"
},
"artifacts": [...]
}
Messages
A Message is a structured payload exchanged between client and agent:
#![allow(unused)] fn main() { pub struct Message { pub id: MessageId, // Unique message ID pub role: MessageRole, // User or Agent pub parts: Vec<Part>, // Content (≥1 part) pub task_id: Option<TaskId>, // Associated task pub context_id: Option<ContextId>, // Conversation thread pub reference_task_ids: Option<Vec<TaskId>>, // Related tasks pub extensions: Option<Vec<String>>, // Extension URIs pub metadata: Option<serde_json::Value>, } }
Roles
| Role | Wire Value | Meaning |
|---|---|---|
User | ROLE_USER | From the client/human side |
Agent | ROLE_AGENT | From the agent/server side |
Creating Messages
#![allow(unused)] fn main() { use a2a_protocol_sdk::prelude::*; let message = Message { id: MessageId::new(uuid::Uuid::new_v4().to_string()), role: MessageRole::User, parts: vec![Part::text("What is 2 + 2?")], task_id: None, context_id: None, reference_task_ids: None, extensions: None, metadata: None, }; }
Parts
Parts are the content units within messages and artifacts. Three types are supported:
Text
#![allow(unused)] fn main() { let part = Part::text("Hello, agent!"); }
Wire format: {"type": "text", "text": "Hello, agent!"}
File (bytes or URI)
#![allow(unused)] fn main() { // Inline bytes (base64-encoded) let part = Part::file_bytes(base64_encoded_string); // URI reference let part = Part::file_uri("https://example.com/document.pdf"); }
Wire format (bytes): {"type": "file", "file": {"bytes": "aGVsbG8="}}
Wire format (URI): {"type": "file", "file": {"uri": "https://example.com/document.pdf"}}
Structured Data
#![allow(unused)] fn main() { let part = Part::data(serde_json::json!({ "table": [ {"name": "Alice", "score": 95}, {"name": "Bob", "score": 87} ] })); }
Wire format: {"type": "data", "data": {"table": [...]}}
Part Metadata
Any part can carry optional metadata:
{
"type": "text",
"text": "Hello",
"metadata": {"language": "en"}
}
Artifacts
Artifacts are results produced by an agent, delivered as part of a task:
#![allow(unused)] fn main() { pub struct Artifact { pub id: ArtifactId, pub name: Option<String>, pub description: Option<String>, pub parts: Vec<Part>, // ≥1 part pub extensions: Option<Vec<String>>, pub metadata: Option<serde_json::Value>, } }
Create an artifact:
#![allow(unused)] fn main() { use a2a_protocol_sdk::prelude::*; let artifact = Artifact::new( "result-1", vec![Part::text("The answer is 42")], ); }
Streaming Artifacts
Artifacts can be delivered incrementally during streaming:
#![allow(unused)] fn main() { // First chunk queue.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), artifact: Artifact::new("doc", vec![Part::text("First paragraph...")]), append: None, last_chunk: Some(false), // More chunks coming metadata: None, })).await?; // Final chunk queue.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), artifact: Artifact::new("doc", vec![Part::text("Last paragraph.")]), append: Some(true), // Append to previous last_chunk: Some(true), // This is the last chunk metadata: None, })).await?; }
ID Types
a2a-rust uses newtype wrappers for type safety:
| Type | Wraps | Example |
|---|---|---|
TaskId | String | TaskId::new("task-abc") |
ContextId | String | ContextId::new("ctx-123") |
MessageId | String | MessageId::new("msg-456") |
ArtifactId | String | Constructed inside Artifact::new |
These prevent accidentally passing a task ID where a context ID is expected.
Next Steps
- Streaming with SSE — Real-time event delivery
- The AgentExecutor Trait — Using tasks and messages in your agent
Streaming with SSE
A2A uses Server-Sent Events (SSE) for real-time streaming. This enables agents to deliver progress updates, partial results, and artifacts as they're produced — instead of making the client wait for the complete response.
How SSE Streaming Works
When a client calls SendStreamingMessage, the server holds the HTTP connection open and sends events as they occur:
HTTP/1.1 200 OK
Content-Type: text/event-stream
data: {"taskId":"t-1","contextId":"ctx-1","status":{"state":"TASK_STATE_WORKING"}}
data: {"taskId":"t-1","contextId":"ctx-1","artifact":{"artifactId":"a-1","parts":[{"text":"partial..."}]},"lastChunk":false}
data: {"taskId":"t-1","contextId":"ctx-1","artifact":{"artifactId":"a-1","parts":[{"text":"complete result"}]},"lastChunk":true}
data: {"taskId":"t-1","contextId":"ctx-1","status":{"state":"TASK_STATE_COMPLETED"}}
Each data: line is a complete JSON object. Events are separated by blank lines.
Stream Event Types
Four types of events can appear in a stream:
StatusUpdate
Reports a task state transition:
{
"taskId": "task-abc",
"contextId": "ctx-123",
"status": {
"state": "TASK_STATE_WORKING",
"timestamp": "2026-03-15T10:30:00Z"
}
}
ArtifactUpdate
Delivers artifact content (potentially in chunks):
{
"taskId": "task-abc",
"contextId": "ctx-123",
"artifact": {
"artifactId": "result-1",
"parts": [{"text": "The answer is..."}]
},
"lastChunk": false,
"append": false
}
Task
A complete task snapshot (usually the final event):
{
"id": "task-abc",
"contextId": "ctx-123",
"status": {"state": "TASK_STATE_COMPLETED"},
"artifacts": [...]
}
Message
A direct message response (for simple request/reply patterns):
{
"messageId": "msg-456",
"role": "ROLE_AGENT",
"parts": [{"text": "Quick answer"}]
}
Server-Side: Writing Events
In your AgentExecutor, write events to the queue:
#![allow(unused)] fn main() { impl AgentExecutor for MyExecutor { fn execute<'a>( &'a self, ctx: &'a RequestContext, queue: &'a dyn EventQueueWriter, ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> { Box::pin(async move { // Signal start queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), status: TaskStatus::new(TaskState::Working), metadata: None, })).await?; // Deliver results in chunks for (i, chunk) in results.iter().enumerate() { queue.write(StreamResponse::ArtifactUpdate( TaskArtifactUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), artifact: Artifact::new("output", vec![Part::text(chunk)]), append: Some(i > 0), last_chunk: Some(i == results.len() - 1), metadata: None, } )).await?; } // Signal completion queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), status: TaskStatus::new(TaskState::Completed), metadata: None, })).await?; Ok(()) }) } } }
Queue Limits
The event queue uses tokio::sync::broadcast channels for fan-out to multiple subscribers:
| Limit | Default | Purpose |
|---|---|---|
| Queue capacity | 64 events | Broadcast channel ring buffer size |
| Max event size | 16 MiB | Rejects oversized events |
With broadcast channels, writes never block — if a reader is too slow, it receives a Lagged notification and skips missed events. The task store is the source of truth; SSE is best-effort notification.
Configure these via the builder:
#![allow(unused)] fn main() { RequestHandlerBuilder::new(executor) .with_event_queue_capacity(128) .with_max_event_size(8 * 1024 * 1024) // 8 MiB .build() }
Client-Side: Consuming Streams
Use stream_message to receive events:
#![allow(unused)] fn main() { let mut stream = client .stream_message(params) .await .expect("connect"); while let Some(event) = stream.next().await { match event { Ok(StreamResponse::StatusUpdate(ev)) => { println!("State: {:?}", ev.status.state); } Ok(StreamResponse::ArtifactUpdate(ev)) => { println!("Artifact: {}", ev.artifact.id); } Ok(StreamResponse::Task(task)) => { println!("Final task: {:?}", task.status.state); } Ok(StreamResponse::Message(msg)) => { println!("Message: {:?}", msg); } Ok(_) => { // Future event types — handle gracefully } Err(e) => { eprintln!("Stream error: {e}"); break; } } } }
Client Protections
The SSE parser includes safety limits:
- 16 MiB buffer cap — Prevents OOM from malicious servers
- 30-second connect timeout — Fails fast on unreachable servers
- Partial line buffering — Handles TCP frame boundaries correctly
Re-subscribing
If a stream disconnects, re-subscribe to an existing task:
#![allow(unused)] fn main() { let mut stream = client .subscribe_to_task("task-abc") .await .expect("resubscribe"); }
The server creates a new broadcast subscriber for the task's event queue. Multiple SSE connections can be active simultaneously for the same task — each receives all events published after it subscribes. If a reader falls behind, it receives a Lagged notification and skips missed events rather than blocking other readers or the writer.
Streaming vs Synchronous
| Aspect | SendMessage | SendStreamingMessage |
|---|---|---|
| Response | Complete task | SSE event stream |
| Progress | No intermediate updates | Real-time updates |
| Long tasks | Client waits | Client sees progress |
| Network | Single request/response | Held connection |
| Complexity | Simple | Requires event handling |
Use streaming when:
- Tasks take more than a few seconds
- You want to show progress to users
- You need incremental artifact delivery
Use synchronous when:
- Tasks complete quickly
- You don't need progress updates
- Simplicity is more important than responsiveness
Next Steps
- Building a Client — Client configuration for streaming
- Streaming Responses — Advanced client streaming patterns
- The AgentExecutor Trait — Writing streaming executors
The AgentExecutor Trait
The AgentExecutor trait is the heart of every A2A agent. It defines what happens when a message arrives. Everything else — HTTP handling, task management, streaming — is handled by the framework.
The Trait
#![allow(unused)] fn main() { pub trait AgentExecutor: Send + Sync + 'static { /// Called when a message arrives (SendMessage or SendStreamingMessage). fn execute<'a>( &'a self, ctx: &'a RequestContext, queue: &'a dyn EventQueueWriter, ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>; /// Called when a client requests task cancellation. /// Default: returns "task not cancelable" error. fn cancel<'a>( &'a self, ctx: &'a RequestContext, queue: &'a dyn EventQueueWriter, ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>; /// Called during graceful server shutdown. /// Default: no-op. fn on_shutdown<'a>( &'a self, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>; } }
Why Pin<Box<dyn Future>>?
This signature ensures object safety — the trait can be stored as Arc<dyn AgentExecutor> and shared across threads. Standard async fn in traits would prevent this. The Box::pin(async move { ... }) wrapper is the idiomatic pattern:
#![allow(unused)] fn main() { impl AgentExecutor for MyAgent { fn execute<'a>( &'a self, ctx: &'a RequestContext, queue: &'a dyn EventQueueWriter, ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> { Box::pin(async move { // Your async logic here Ok(()) }) } } }
Ergonomic Helpers
The executor_helpers module provides shortcuts to reduce boilerplate.
boxed_future helper
Wraps an async block into the required Pin<Box<dyn Future>>:
#![allow(unused)] fn main() { use a2a_protocol_server::executor_helpers::boxed_future; fn execute<'a>(&'a self, ctx: &'a RequestContext, queue: &'a dyn EventQueueWriter) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> { boxed_future(async move { // Your logic here — no Box::pin wrapper needed! Ok(()) }) } }
agent_executor! macro
Generates the full AgentExecutor impl from a closure-like syntax:
#![allow(unused)] fn main() { use a2a_protocol_server::agent_executor; struct EchoAgent; // Simple form (execute only) agent_executor!(EchoAgent, |ctx, queue| async { Ok(()) }); // With cancel handler agent_executor!(CancelableAgent, execute: |ctx, queue| async { Ok(()) }, cancel: |ctx, queue| async { Ok(()) } ); }
EventEmitter helper
Eliminates the repetitive task_id.clone() / context_id.clone() in every event:
#![allow(unused)] fn main() { use a2a_protocol_server::executor_helpers::EventEmitter; agent_executor!(MyAgent, |ctx, queue| async { let emit = EventEmitter::new(ctx, queue); emit.status(TaskState::Working).await?; emit.artifact("result", vec![Part::text("done")], None, Some(true)).await?; if emit.is_cancelled() { emit.status(TaskState::Canceled).await?; return Ok(()); } emit.status(TaskState::Completed).await?; Ok(()) }); }
| Method | Description |
|---|---|
status(TaskState) | Emit a status update event |
artifact(id, parts, append, last_chunk) | Emit an artifact update event |
is_cancelled() | Check if the task was cancelled |
RequestContext
The RequestContext provides information about the incoming request:
| Field | Type | Description |
|---|---|---|
task_id | TaskId | Server-assigned task ID |
context_id | String | Conversation context ID |
message | Message | The incoming message with parts |
stored_task | Option<Task> | Previously stored task snapshot (for continuations) |
metadata | Option<Value> | Arbitrary metadata from the request |
cancellation_token | CancellationToken | Token for cooperative cancellation |
EventQueueWriter
The queue is your channel for sending events back to the client:
#![allow(unused)] fn main() { // Write a status update queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), status: TaskStatus::new(TaskState::Working), metadata: None, })).await?; // Write an artifact queue.write(StreamResponse::ArtifactUpdate(TaskArtifactUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), artifact: Artifact::new("result", vec![Part::text("output")]), append: None, last_chunk: Some(true), metadata: None, })).await?; }
For synchronous clients (SendMessage), the handler collects all events and assembles the final Task response. For streaming clients (SendStreamingMessage), events are delivered as SSE in real time. Your executor doesn't need to know which mode the client used — just write events to the queue.
Common Patterns
The Standard Three-Event Pattern
Most executors follow this structure:
#![allow(unused)] fn main() { Box::pin(async move { // 1. Working queue.write(StreamResponse::StatusUpdate(/* Working */)).await?; // 2. Produce results let result = do_work(&ctx.message).await?; queue.write(StreamResponse::ArtifactUpdate(/* result */)).await?; // 3. Completed queue.write(StreamResponse::StatusUpdate(/* Completed */)).await?; Ok(()) }) }
Error Handling
If your executor encounters an error, transition to Failed with a descriptive message:
#![allow(unused)] fn main() { Box::pin(async move { queue.write(/* Working */).await?; match do_work(&ctx.message).await { Ok(result) => { queue.write(/* ArtifactUpdate with result */).await?; queue.write(/* Completed */).await?; } Err(e) => { queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), status: TaskStatus { state: TaskState::Failed, message: Some(Message { id: MessageId::new(uuid::Uuid::new_v4().to_string()), role: MessageRole::Agent, parts: vec![Part::text(&format!("Error: {e}"))], task_id: None, context_id: None, reference_task_ids: None, extensions: None, metadata: None, }), timestamp: None, }, metadata: None, })).await?; } } Ok(()) }) }
Requesting More Input
When the agent needs clarification:
#![allow(unused)] fn main() { queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), status: TaskStatus { state: TaskState::InputRequired, message: Some(Message { id: MessageId::new(uuid::Uuid::new_v4().to_string()), role: MessageRole::Agent, parts: vec![Part::text("Which format would you like: PDF or HTML?")], // ...remaining fields task_id: None, context_id: None, reference_task_ids: None, extensions: None, metadata: None, }), timestamp: None, }, metadata: None, })).await?; }
The client can then send another message with the same context_id to continue the conversation.
Supporting Cancellation
Override the cancel method:
#![allow(unused)] fn main() { fn cancel<'a>( &'a self, ctx: &'a RequestContext, queue: &'a dyn EventQueueWriter, ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> { Box::pin(async move { // Clean up any in-progress work self.cancel_token.cancel(); queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), status: TaskStatus::new(TaskState::Canceled), metadata: None, })).await?; Ok(()) }) } }
Executor with State
Executors can hold state — database connections, model handles, configuration:
#![allow(unused)] fn main() { struct LlmExecutor { model: Arc<Model>, db: Arc<DatabasePool>, max_tokens: usize, } impl AgentExecutor for LlmExecutor { fn execute<'a>( &'a self, ctx: &'a RequestContext, queue: &'a dyn EventQueueWriter, ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> { Box::pin(async move { // Access self.model, self.db, self.max_tokens let response = self.model.generate(&input, self.max_tokens).await?; // ... Ok(()) }) } } }
Because the trait requires Send + Sync + 'static, the executor must be safe to share across threads. Use Arc for shared state.
Executor Timeout
The builder can set a timeout that kills hung executors:
#![allow(unused)] fn main() { use std::time::Duration; RequestHandlerBuilder::new(my_executor) .with_executor_timeout(Duration::from_secs(300)) // 5 minutes .build() }
If the executor doesn't complete within the timeout, the task transitions to Failed automatically.
Next Steps
- Request Handler & Builder — Configuring the handler around your executor
- Push Notifications — Delivering results asynchronously
Request Handler & Builder
The RequestHandler is the central orchestrator that connects your executor to the protocol. It manages task lifecycle, storage, streaming, push notifications, and interceptors. You build one using RequestHandlerBuilder.
Building a Handler
Minimal Setup
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::RequestHandlerBuilder; let handler = RequestHandlerBuilder::new(MyExecutor) .build() .expect("build handler"); }
This gives you sensible defaults:
- In-memory task store
- In-memory push config store
- No push sender (webhooks disabled)
- No interceptors
- No agent card
- No executor timeout
Full Configuration
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::RequestHandlerBuilder; use std::time::Duration; let handler = RequestHandlerBuilder::new(MyExecutor) // Agent card for discovery .with_agent_card(make_agent_card()) // Task storage .with_task_store_config(TaskStoreConfig { task_ttl: Some(Duration::from_secs(3600)), // 1 hour TTL max_capacity: Some(10_000), // Max 10k tasks ..Default::default() }) // Push notifications .with_push_sender(HttpPushSender::new()) // Interceptors .with_interceptor(AuthInterceptor::new()) .with_interceptor(LoggingInterceptor::new()) // Executor limits .with_executor_timeout(Duration::from_secs(300)) // Streaming limits .with_event_queue_capacity(128) .with_max_event_size(8 * 1024 * 1024) // 8 MiB .with_max_concurrent_streams(1000) .build() .expect("build handler"); }
Builder Methods Reference
Required
| Method | Description |
|---|---|
new(executor) | Set the agent executor (type-erased to Arc<dyn AgentExecutor>) |
Optional
| Method | Default | Description |
|---|---|---|
with_agent_card(AgentCard) | None | Discovery card for /.well-known/agent.json |
with_task_store(impl TaskStore) | InMemoryTaskStore | Custom task storage backend |
with_task_store_config(TaskStoreConfig) | 1hr TTL, 10k capacity | TTL and capacity for the default store |
with_push_config_store(impl PushConfigStore) | InMemoryPushConfigStore | Custom push config storage |
with_push_sender(impl PushSender) | None | Webhook delivery implementation |
with_interceptor(impl ServerInterceptor) | Empty chain | Add a server interceptor |
with_executor_timeout(Duration) | None | Timeout for executor completion |
with_event_queue_capacity(usize) | 64 | Bounded channel size per stream |
with_max_event_size(usize) | 16 MiB | Maximum serialized event size |
with_max_concurrent_streams(usize) | Unbounded | Limit concurrent SSE streams |
Build-Time Validation
build() validates:
- If an agent card is provided, it must have at least one
supported_interfacesentry - Executor timeout (if set) must not be zero
Sharing the Handler
The handler is wrapped in Arc for sharing between dispatchers:
#![allow(unused)] fn main() { use std::sync::Arc; let handler = Arc::new( RequestHandlerBuilder::new(MyExecutor) .build() .unwrap() ); // Both dispatchers share the same handler let jsonrpc = JsonRpcDispatcher::new(Arc::clone(&handler)); let rest = RestDispatcher::new(handler); }
This means JSON-RPC and REST clients share the same task store, push configs, and executor.
Task Store Configuration
The default InMemoryTaskStore supports TTL and capacity limits:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::TaskStoreConfig; use std::time::Duration; let config = TaskStoreConfig { task_ttl: Some(Duration::from_secs(3600)), // Tasks expire after 1 hour max_capacity: Some(50_000), // Keep at most 50k tasks ..Default::default() }; RequestHandlerBuilder::new(executor) .with_task_store_config(config) .build() }
When capacity is exceeded, the oldest tasks are evicted. When TTL expires, tasks are cleaned up on the next access.
Custom Task Stores
For production use, implement the TaskStore trait for your database:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::TaskStore; struct PostgresTaskStore { /* ... */ } impl TaskStore for PostgresTaskStore { // Implement get, put, list, delete... } RequestHandlerBuilder::new(executor) .with_task_store(PostgresTaskStore::new(pool)) .build() }
See Task & Config Stores for the full trait API.
Next Steps
- Dispatchers — HTTP dispatchers for JSON-RPC and REST
- Interceptors & Middleware — Request/response hooks
- Task & Config Stores — Custom storage backends
Dispatchers (JSON-RPC, REST & gRPC)
Dispatchers translate HTTP/gRPC requests into handler calls. a2a-rust provides four built-in dispatchers: JsonRpcDispatcher, RestDispatcher, WebSocketDispatcher (websocket feature), and GrpcDispatcher (grpc feature).
JsonRpcDispatcher
Routes JSON-RPC 2.0 requests to the handler:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::JsonRpcDispatcher; use std::sync::Arc; let dispatcher = Arc::new(JsonRpcDispatcher::new(handler)); }
Features
- Single endpoint — All methods go to
/as POST requests - Agent card —
GET /.well-known/agent.jsonreturns the agent card (same as REST) - Batch support — Handles JSON-RPC batch arrays
- ID preservation — Echoes back the exact request ID (string, number, float, null)
- Streaming —
SendStreamingMessageandSubscribeToTaskreturn SSE streams - CORS — Configurable cross-origin headers
- Content type — Accepts
application/json
Batch Restrictions
Streaming methods cannot appear in batch requests:
SendStreamingMessagein a batch → error responseSubscribeToTaskin a batch → error response
An empty batch [] returns a parse error.
RestDispatcher
Routes RESTful HTTP requests to the handler:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::RestDispatcher; use std::sync::Arc; let dispatcher = Arc::new(RestDispatcher::new(handler)); }
Route Table
| Method | Path | Handler |
|---|---|---|
POST | /message:send | SendMessage |
POST | /message:stream | SendStreamingMessage |
GET | /tasks | ListTasks |
GET | /tasks/{id} | GetTask |
POST | /tasks/{id}:cancel | CancelTask |
GET | /tasks/{id}:subscribe | SubscribeToTask |
POST | /tasks/{id}/pushNotificationConfigs | CreatePushConfig |
GET | /tasks/{id}/pushNotificationConfigs | ListPushConfigs |
GET | /tasks/{id}/pushNotificationConfigs/{cfgId} | GetPushConfig |
DELETE | /tasks/{id}/pushNotificationConfigs/{cfgId} | DeletePushConfig |
GET | /.well-known/agent.json | AgentCard |
Multi-Tenancy
Tenant routes are prefixed with /tenants/{tenant-id}/:
GET /tenants/acme-corp/tasks
GET /tenants/acme-corp/tasks/{id}
POST /tenants/acme-corp/message:send
Built-in Security
The REST dispatcher includes automatic protections:
| Protection | Behavior |
|---|---|
| Path traversal | .. in path segments (including %2E%2E, %2e%2e) → 400 |
| Query string size | Over 4 KiB → 414 |
| Body size | Over 4 MiB → 413 |
| Content type | Accepts application/json and application/a2a+json |
Server Startup
Using serve() (recommended)
Both dispatchers implement the Dispatcher trait, so you can use the serve() helper to eliminate hyper boilerplate:
#![allow(unused)] fn main() { use a2a_protocol_server::serve::{serve, serve_with_addr}; // Blocking — runs the accept loop on the current task serve("127.0.0.1:3000", JsonRpcDispatcher::new(handler)).await?; // Non-blocking — spawns the server and returns the bound address let addr = serve_with_addr("127.0.0.1:0", dispatcher).await?; println!("Listening on {addr}"); }
Manual wiring (advanced)
Both dispatchers also expose a dispatch method for direct hyper integration:
#![allow(unused)] fn main() { use std::sync::Arc; async fn start_server( dispatcher: Arc<JsonRpcDispatcher>, addr: &str, ) { let listener = tokio::net::TcpListener::bind(addr) .await .expect("bind"); loop { let (stream, _) = listener.accept().await.expect("accept"); let io = hyper_util::rt::TokioIo::new(stream); let dispatcher = Arc::clone(&dispatcher); tokio::spawn(async move { let service = hyper::service::service_fn(move |req| { let d = Arc::clone(&dispatcher); async move { Ok::<_, std::convert::Infallible>(d.dispatch(req).await) } }); let _ = hyper_util::server::conn::auto::Builder::new( hyper_util::rt::TokioExecutor::new(), ) .serve_connection(io, service) .await; }); } } }
No web framework required — the dispatchers work directly with hyper's service layer.
GrpcDispatcher
Routes gRPC requests to the handler via tonic. Enable with the grpc feature flag:
a2a-protocol-server = { version = "0.2", features = ["grpc"] }
#![allow(unused)] fn main() { use a2a_protocol_server::{GrpcDispatcher, GrpcConfig}; use std::sync::Arc; let config = GrpcConfig::default() .with_max_message_size(8 * 1024 * 1024) .with_concurrency_limit(128); let dispatcher = GrpcDispatcher::new(handler, config); // Blocking server dispatcher.serve("0.0.0.0:50051").await?; // Non-blocking (returns bound address) let addr = dispatcher.serve_with_addr("127.0.0.1:0").await?; println!("gRPC listening on {addr}"); // Pre-bind pattern (when you need the address before building the handler) let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?; let addr = listener.local_addr()?; // ... build handler using addr for agent card URL ... let dispatcher = GrpcDispatcher::new(handler, config); let bound = dispatcher.serve_with_listener(listener)?; }
GrpcConfig
| Field | Type | Default | Description |
|---|---|---|---|
max_message_size | usize | 4 MiB | Maximum inbound/outbound message size |
concurrency_limit | usize | 256 | Maximum concurrent gRPC requests per connection |
stream_channel_capacity | usize | 64 | Bounded channel for streaming responses |
Protocol
All 11 A2A methods are mapped to gRPC RPCs. JSON payloads are carried inside protobuf bytes fields, reusing the same serde types as JSON-RPC and REST — no duplicate protobuf definitions needed.
Streaming methods (SendStreamingMessage, SubscribeToTask) use gRPC server streaming.
Custom Server Setup
For advanced scenarios, use into_service() to get a tonic service:
#![allow(unused)] fn main() { let svc = dispatcher.into_service(); tonic::transport::Server::builder() .add_service(svc) .serve(addr) .await?; }
Running Multiple Transports
Serve JSON-RPC and REST on different ports with the same handler:
#![allow(unused)] fn main() { use a2a_protocol_server::serve::serve_with_addr; let handler = Arc::new( RequestHandlerBuilder::new(MyExecutor) .with_agent_card(make_agent_card("http://localhost:3000", "http://localhost:3001")) .build() .unwrap(), ); // JSON-RPC on port 3000 let jsonrpc_addr = serve_with_addr("127.0.0.1:3000", JsonRpcDispatcher::new(Arc::clone(&handler))).await?; // REST on port 3001 let rest_addr = serve_with_addr("127.0.0.1:3001", RestDispatcher::new(handler)).await?; }
CORS Configuration
Both dispatchers support CORS for browser-based clients:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::CorsConfig; // The dispatchers handle OPTIONS preflight automatically. // CORS headers are included on all responses. }
Next Steps
- Push Notifications — Webhook delivery
- Interceptors & Middleware — Request/response hooks
- Production Hardening — Deployment best practices
Push Notifications
Push notifications let agents deliver results asynchronously via webhooks. Instead of the client holding an SSE connection open, the server POSTs events to a URL the client provides.
How Push Notifications Work
Client Agent Server Client Webhook
│ │ │
│ CreatePushConfig │ │
│ ────────────────────►│ │
│ Config with ID │ │
│ ◄────────────────────│ │
│ │ │
│ SendMessage │ │
│ ────────────────────►│ │
│ Task (submitted) │ │
│ ◄────────────────────│ │
│ │ │
│ │ Executor runs │
│ │ │
│ │ POST event │
│ │ ────────────────────►│
│ │ POST event │
│ │ ────────────────────►│
│ │ │
- Client registers a webhook URL via
CreateTaskPushNotificationConfig - Client sends a message (with
return_immediately: truefor async) - Agent processes the message and pushes events to the webhook
Setting Up Push Notifications
Server Side
Enable push by providing a PushSender:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::{RequestHandlerBuilder, HttpPushSender}; let handler = RequestHandlerBuilder::new(my_executor) .with_push_sender(HttpPushSender::new()) .build() .unwrap(); }
The built-in HttpPushSender includes:
- SSRF protection — Resolves URLs and rejects private/loopback IP addresses
- Header injection prevention — Validates credentials contain no
\ror\n - HTTPS validation — Optionally enforces HTTPS-only webhook URLs
Client Side
Register a push notification configuration:
#![allow(unused)] fn main() { use a2a_protocol_sdk::types::push::TaskPushNotificationConfig; let config = TaskPushNotificationConfig::new( "task-abc", // Task to watch "https://my-service.com/webhook", // Webhook URL ); let saved = client.set_push_config(config).await?; println!("Config ID: {:?}", saved.id); }
Managing Push Configs
#![allow(unused)] fn main() { // List all configs for a task let configs = client.list_push_configs(ListPushConfigsParams { tenant: None, task_id: "task-abc".into(), page_size: None, page_token: None, }).await?; // Get a specific config let config = client.get_push_config("task-abc", "config-123").await?; // Delete a config client.delete_push_config("task-abc", "config-123").await?; }
Authentication
Push configs support authentication for the webhook endpoint:
#![allow(unused)] fn main() { use a2a_protocol_sdk::types::push::{TaskPushNotificationConfig, AuthenticationInfo}; let mut config = TaskPushNotificationConfig::new("task-abc", "https://webhook.example.com"); config.authentication = Some(AuthenticationInfo { scheme: "bearer".into(), credentials: "my-secret-token".into(), }); }
The server includes these credentials in the Authorization header when POSTing to the webhook.
Custom PushSender
Implement the PushSender trait for custom delivery:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::PushSender; struct SqsPushSender { client: aws_sdk_sqs::Client, } impl PushSender for SqsPushSender { fn send<'a>( &'a self, config: &'a TaskPushNotificationConfig, event: &'a StreamResponse, ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> { Box::pin(async move { // Send event to SQS instead of HTTP webhook Ok(()) }) } } }
Push Config Storage
The default InMemoryPushConfigStore stores configs in memory with per-task limits. For production, implement PushConfigStore:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::PushConfigStore; struct PostgresPushConfigStore { /* ... */ } impl PushConfigStore for PostgresPushConfigStore { // Implement create, get, list, delete... } RequestHandlerBuilder::new(executor) .with_push_config_store(PostgresPushConfigStore::new(pool)) .build() }
Security Considerations
- Always use HTTPS for webhook URLs in production
- The built-in
HttpPushSenderrejects private IP addresses to prevent SSRF attacks - Webhook credentials are validated for header injection characters
- Consider rate limiting webhook delivery to prevent abuse
Next Steps
- Interceptors & Middleware — Server-side request hooks
- Task & Config Stores — Persistent storage backends
Interceptors & Middleware
Interceptors let you hook into the request/response pipeline on both the client and server side — for authentication, logging, metrics, rate limiting, or any cross-cutting concern.
Server Interceptors
Server interceptors run before and after the handler processes a request:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::ServerInterceptor; struct LoggingInterceptor; impl ServerInterceptor for LoggingInterceptor { // Intercept incoming requests and outgoing responses } }
Adding Interceptors
#![allow(unused)] fn main() { RequestHandlerBuilder::new(my_executor) .with_interceptor(AuthInterceptor::new(auth_config)) .with_interceptor(LoggingInterceptor) .with_interceptor(MetricsInterceptor::new()) .build() }
Interceptors execute in the order they're added:
Request → Auth → Logging → Metrics → Handler → Metrics → Logging → Auth → Response
Example: Authentication
#![allow(unused)] fn main() { struct BearerAuthInterceptor { valid_tokens: HashSet<String>, } impl ServerInterceptor for BearerAuthInterceptor { // Check Authorization header before passing to handler // Return 401 if token is missing or invalid } }
Client Interceptors
Client interceptors modify outgoing requests and incoming responses:
#![allow(unused)] fn main() { use a2a_protocol_sdk::client::CallInterceptor; struct RequestIdInterceptor; impl CallInterceptor for RequestIdInterceptor { // Add X-Request-Id header to outgoing requests // Log the response status } }
Adding Client Interceptors
#![allow(unused)] fn main() { use a2a_protocol_sdk::client::ClientBuilder; let client = ClientBuilder::new("http://agent.example.com".into()) .with_interceptor(RequestIdInterceptor) .with_interceptor(RetryInterceptor::new(3)) .build() .unwrap(); }
Common Patterns
Logging
Log method names, durations, and errors:
#![allow(unused)] fn main() { struct LoggingInterceptor; // Log: "SendMessage completed in 42ms" // Log: "GetTask failed: task not found (15ms)" }
Metrics
Track request counts, latencies, error rates:
#![allow(unused)] fn main() { struct MetricsInterceptor { counter: Arc<AtomicU64>, } // Increment counter on each request // Record latency histogram }
Rate Limiting
The built-in RateLimitInterceptor provides per-caller fixed-window rate limiting:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::{RateLimitInterceptor, RateLimitConfig}; use std::sync::Arc; let limiter = Arc::new(RateLimitInterceptor::new(RateLimitConfig { requests_per_window: 100, window_secs: 60, })); // Add to handler builder: RequestHandlerBuilder::new(my_executor) .with_interceptor(limiter) .build() }
Caller keys are derived from CallContext::caller_identity (set by auth
interceptors), the X-Forwarded-For header, or "anonymous". For advanced
use cases (sliding windows, distributed counters), implement a custom
ServerInterceptor or use a reverse proxy.
Interceptor Chain
Both client and server support ordered interceptor chains. The chain is built incrementally:
#![allow(unused)] fn main() { // Each .with_interceptor() call appends to the chain builder .with_interceptor(first) // Runs first on request, last on response .with_interceptor(second) // Runs second on request, second-to-last on response .with_interceptor(third) // Runs third on request, first on response }
Next Steps
- Task & Config Stores — Pluggable storage backends
- Production Hardening — Security and reliability
Task & Config Stores
a2a-rust uses pluggable storage backends for tasks and push notification configs. The built-in in-memory stores work for development and testing. For production, implement the traits for your database.
TaskStore Trait
The TaskStore trait defines how tasks are persisted:
#![allow(unused)] fn main() { pub trait TaskStore: Send + Sync + 'static { fn save(&self, task: Task) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + '_>>; fn get(&self, id: &TaskId) -> Pin<Box<dyn Future<Output = A2aResult<Option<Task>>> + Send + '_>>; fn list(&self, params: &ListTasksParams) -> Pin<Box<dyn Future<Output = A2aResult<TaskListResponse>> + Send + '_>>; fn insert_if_absent(&self, task: Task) -> Pin<Box<dyn Future<Output = A2aResult<bool>> + Send + '_>>; fn delete(&self, id: &TaskId) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + '_>>; /// Returns the total number of tasks. Default returns 0. fn count(&self) -> Pin<Box<dyn Future<Output = A2aResult<u64>> + Send + '_>>; } }
InMemoryTaskStore
The default implementation with optional TTL and capacity limits:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::{InMemoryTaskStore, TaskStoreConfig}; use std::time::Duration; // Default: 1hr TTL, 10k capacity let store = InMemoryTaskStore::new(); // With custom limits let store = InMemoryTaskStore::with_config(TaskStoreConfig { task_ttl: Some(Duration::from_secs(7200)), // 2 hour TTL max_capacity: Some(50_000), ..Default::default() }); }
Features:
- Thread-safe (
DashMap-based orRwLock<HashMap>) - Automatic TTL eviction on access
- Capacity eviction (oldest first) when limit exceeded
- Pagination support with cursor tokens
- Filtering by
context_id,status, andtimestamp
SqliteTaskStore (feature-gated)
Enable the sqlite feature for a production-ready persistent store:
[dependencies]
a2a-protocol-server = { version = "0.2", features = ["sqlite"] }
#![allow(unused)] fn main() { use a2a_protocol_server::store::SqliteTaskStore; let store = SqliteTaskStore::new("sqlite:tasks.db").await?; // Or use an in-memory database for testing: let store = SqliteTaskStore::new("sqlite::memory:").await?; }
Features:
- Auto-creates schema on first use
- Stores tasks as JSON blobs with indexed
context_idandstatecolumns - Cursor-based pagination via
id > ?ordering - Atomic
insert_if_absentviaINSERT OR IGNORE - Upsert via
ON CONFLICT DO UPDATE
TenantAwareInMemoryTaskStore
For multi-tenant deployments, use TenantAwareInMemoryTaskStore which provides full tenant isolation using tokio::task_local!:
#![allow(unused)] fn main() { use a2a_protocol_server::store::{TenantAwareInMemoryTaskStore, TenantContext}; use std::sync::Arc; let store = Arc::new(TenantAwareInMemoryTaskStore::new()); // Each tenant gets an independent store instance. // Use TenantContext::scope() to set the active tenant: TenantContext::scope("tenant-alpha".to_string(), { let store = store.clone(); async move { store.save(task).await.unwrap(); } }).await; // Tasks saved under one tenant are invisible to others: TenantContext::scope("tenant-beta".to_string(), { let store = store.clone(); async move { let result = store.get(&task_id).await.unwrap(); assert!(result.is_none()); // tenant-beta can't see tenant-alpha's task } }).await; // Track tenant count for capacity monitoring: let count = store.tenant_count().await; }
The TenantContext::scope() pattern uses tokio::task_local! to thread the tenant ID through the async call stack without passing it as a parameter. The RequestHandler automatically sets the tenant scope when params.tenant is populated.
TenantAwareSqliteTaskStore (feature-gated)
For persistent multi-tenant storage, enable the sqlite feature:
#![allow(unused)] fn main() { use a2a_protocol_server::store::TenantAwareSqliteTaskStore; let store = TenantAwareSqliteTaskStore::new("sqlite:tasks.db").await?; }
This variant partitions data by a tenant_id column instead of using task-local storage, making it suitable for production deployments where tenants may span multiple server instances.
Note: Corresponding
TenantAwareInMemoryPushConfigStoreandTenantAwareSqlitePushConfigStorevariants exist for push notification config storage.
Custom Implementation
#![allow(unused)] fn main() { struct PostgresTaskStore { pool: sqlx::PgPool, } impl TaskStore for PostgresTaskStore { fn get<'a>(&'a self, id: &'a TaskId) -> Pin<Box<dyn Future<Output = A2aResult<Option<Task>>> + Send + 'a>> { Box::pin(async move { let row = sqlx::query_as("SELECT data FROM tasks WHERE id = $1") .bind(id.as_ref()) .fetch_optional(&self.pool) .await .map_err(|e| A2aError::internal(e.to_string()))?; Ok(row.map(|r| serde_json::from_value(r.data).unwrap())) }) } // ... implement save, list, delete, insert_if_absent, count similarly } }
PushConfigStore Trait
The PushConfigStore trait manages push notification configurations:
#![allow(unused)] fn main() { pub trait PushConfigStore: Send + Sync + 'static { fn set(&self, config: TaskPushNotificationConfig) -> Pin<Box<dyn Future<Output = A2aResult<TaskPushNotificationConfig>> + Send + '_>>; fn get(&self, task_id: &str, id: &str) -> Pin<Box<dyn Future<Output = A2aResult<Option<TaskPushNotificationConfig>>> + Send + '_>>; fn list(&self, task_id: &str) -> Pin<Box<dyn Future<Output = A2aResult<Vec<TaskPushNotificationConfig>>> + Send + '_>>; fn delete(&self, task_id: &str, id: &str) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + '_>>; } }
InMemoryPushConfigStore
The default implementation stores configs in a HashMap:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::InMemoryPushConfigStore; let store = InMemoryPushConfigStore::new(); }
Features:
- Server-assigned config IDs (UUIDs)
- Per-task config limits (prevents abuse)
- Thread-safe access
Wiring Custom Stores
#![allow(unused)] fn main() { let handler = RequestHandlerBuilder::new(executor) .with_task_store(PostgresTaskStore::new(pool.clone())) .with_push_config_store(PostgresPushConfigStore::new(pool)) .build() .unwrap(); }
Design Considerations
Object Safety
Both traits use Pin<Box<dyn Future>> return types for object safety. This allows the handler to store them as Arc<dyn TaskStore>.
Tenant Isolation
Tenant isolation uses tokio::task_local! via TenantContext::scope(), not method parameters. For in-memory stores, TenantAwareInMemoryTaskStore automatically partitions data by tenant. For SQL stores, TenantAwareSqliteTaskStore partitions by a tenant_id column. If you implement a custom store, use TenantContext::current_tenant() to retrieve the active tenant within the async call stack.
Pagination
The list method receives ListTasksParams with:
page_size— Number of results per page (1-100, default 50)page_token— Opaque cursor for the next page- Various filter fields
Your implementation should return a TaskListResponse with a next_page_token if more results exist.
Concurrency
Both traits require Send + Sync. Use connection pools, not single connections:
#![allow(unused)] fn main() { // Good struct MyStore { pool: sqlx::PgPool } // Bad — not Send + Sync struct MyStore { conn: sqlx::PgConnection } }
Next Steps
- Production Hardening — Deployment checklist
- Configuration Reference — All configuration options
Building a Client
The ClientBuilder creates an A2aClient configured for your target agent. It handles transport selection, timeouts, authentication, and interceptors.
Basic Client
#![allow(unused)] fn main() { use a2a_protocol_sdk::client::ClientBuilder; let client = ClientBuilder::new("http://agent.example.com".to_string()) .build() .expect("build client"); }
The builder auto-selects the transport based on the URL. To explicitly choose:
#![allow(unused)] fn main() { // Force JSON-RPC transport let client = ClientBuilder::new("http://agent.example.com".to_string()) .with_protocol_binding("JSONRPC") .build() .unwrap(); // Force REST transport let client = ClientBuilder::new("http://agent.example.com".to_string()) .with_protocol_binding("REST") .build() .unwrap(); }
Configuration Options
Timeouts
#![allow(unused)] fn main() { use std::time::Duration; let client = ClientBuilder::new(url) .with_timeout(Duration::from_secs(60)) // Per-request timeout (default: 30s) .with_connection_timeout(Duration::from_secs(5)) // TCP connect timeout (default: 10s) .with_stream_connect_timeout(Duration::from_secs(15)) // SSE connect timeout (default: 30s) .build() .unwrap(); }
Output Modes
Specify which MIME types the client can handle:
#![allow(unused)] fn main() { let client = ClientBuilder::new(url) .with_accepted_output_modes(vec![ "text/plain".into(), "application/json".into(), "image/png".into(), ]) .build() .unwrap(); }
Default: ["text/plain", "application/json"]
History Length
Control how many historical messages are included in responses:
#![allow(unused)] fn main() { let client = ClientBuilder::new(url) .with_history_length(10) // Include last 10 messages .build() .unwrap(); }
Interceptors
Add request/response hooks:
#![allow(unused)] fn main() { let client = ClientBuilder::new(url) .with_interceptor(MyAuthInterceptor::new()) .with_interceptor(LoggingInterceptor) .build() .unwrap(); }
Retry Policy
Enable automatic retries on transient failures (connection errors, timeouts, HTTP 429/502/503/504):
#![allow(unused)] fn main() { use a2a_protocol_client::RetryPolicy; let client = ClientBuilder::new(url) .with_retry_policy(RetryPolicy::default()) // 3 retries, 500ms initial backoff .build() .unwrap(); // Custom retry configuration let client = ClientBuilder::new(url) .with_retry_policy( RetryPolicy::default() .with_max_retries(5) .with_initial_backoff(Duration::from_secs(1)) .with_max_backoff(Duration::from_secs(60)) .with_backoff_multiplier(3.0), ) .build() .unwrap(); }
Return Immediately
For push notification workflows, return the task immediately without waiting for completion:
#![allow(unused)] fn main() { let client = ClientBuilder::new(url) .with_return_immediately(true) .build() .unwrap(); }
Builder Reference
| Method | Default | Description |
|---|---|---|
new(url) | — | Base URL of the agent (required) |
with_protocol_binding(str) | Auto-detect | Force transport: "JSONRPC", "REST", or "GRPC" |
with_custom_transport(impl Transport) | None | Use a custom transport (e.g., GrpcTransport) |
with_timeout(Duration) | 30s | Per-request timeout |
with_connection_timeout(Duration) | 10s | TCP connection timeout |
with_stream_connect_timeout(Duration) | 30s | SSE stream connect timeout |
with_retry_policy(RetryPolicy) | None | Retry on transient errors |
with_accepted_output_modes(Vec<String>) | ["text/plain", "application/json"] | MIME types the client handles |
with_history_length(u32) | None | Messages to include in responses |
with_return_immediately(bool) | false | Don't wait for task completion |
with_interceptor(impl CallInterceptor) | Empty chain | Add request/response hook |
Client Reuse (Best Practice)
Create clients once and reuse them across requests. Each A2aClient holds
a connection pool internally (via hyper), so reuse avoids repeated DNS
resolution, TCP handshakes, and TLS negotiation on every call.
#![allow(unused)] fn main() { // ✅ Good: build once, reuse across requests struct MyOrchestrator { analyzer: A2aClient, builder: A2aClient, } impl MyOrchestrator { fn new(analyzer_url: &str, builder_url: &str) -> Self { Self { analyzer: ClientBuilder::new(analyzer_url).build().unwrap(), builder: ClientBuilder::new(builder_url) .with_protocol_binding("REST") .build() .unwrap(), } } async fn run(&self) { // Reuse the same client for every request let _ = self.analyzer.send_message(params).await; let _ = self.builder.send_message(params).await; } } }
#![allow(unused)] fn main() { // ❌ Avoid: rebuilding the client on every call async fn bad_pattern(url: &str) { // This works but wastes resources — connection pool is discarded each time let client = ClientBuilder::new(url).build().unwrap(); let _ = client.send_message(params).await; } }
gRPC Client
For gRPC transport, use GrpcTransport::connect() with with_custom_transport():
#![allow(unused)] fn main() { use a2a_protocol_client::GrpcTransport; let transport = GrpcTransport::connect("http://agent.example.com:50051").await?; let client = ClientBuilder::new("http://agent.example.com:50051") .with_custom_transport(transport) .build()?; }
Configure with GrpcTransportConfig:
#![allow(unused)] fn main() { use a2a_protocol_client::transport::grpc::{GrpcTransport, GrpcTransportConfig}; use std::time::Duration; let config = GrpcTransportConfig::default() .with_timeout(Duration::from_secs(60)) .with_max_message_size(8 * 1024 * 1024); let transport = GrpcTransport::connect_with_config( "http://agent.example.com:50051", config, ).await?; }
Thread Safety
A2aClient is Send + Sync and can be shared across tasks via Arc:
#![allow(unused)] fn main() { use std::sync::Arc; let client = Arc::new( ClientBuilder::new(url).build().unwrap() ); // Share across async tasks let c1 = Arc::clone(&client); tokio::spawn(async move { c1.send_message(params1).await }); let c2 = Arc::clone(&client); tokio::spawn(async move { c2.send_message(params2).await }); }
Next Steps
- Sending Messages — Synchronous message send
- Streaming Responses — Real-time event consumption
- Task Management — Querying and canceling tasks
Sending Messages
The most common operation: send a message to an agent and get a response.
Synchronous Send
send_message sends a message and waits for the task to complete:
#![allow(unused)] fn main() { use a2a_protocol_sdk::prelude::*; let params = MessageSendParams { tenant: None, message: Message { id: MessageId::new(uuid::Uuid::new_v4().to_string()), role: MessageRole::User, parts: vec![Part::text("What is the capital of France?")], task_id: None, context_id: None, reference_task_ids: None, extensions: None, metadata: None, }, configuration: None, metadata: None, }; let response = client.send_message(params).await?; }
Handling the Response
SendMessageResponse is an enum with two variants:
#![allow(unused)] fn main() { match response { SendMessageResponse::Task(task) => { println!("Task ID: {}", task.id); println!("Status: {:?}", task.status.state); // Extract text from artifacts if let Some(artifacts) = &task.artifacts { for artifact in artifacts { for part in &artifact.parts { if let a2a_protocol_types::message::PartContent::Text { text } = &part.content { println!("Result: {text}"); } } } } } SendMessageResponse::Message(msg) => { // Some agents respond with a direct message instead of a task println!("Direct message: {:?}", msg); } _ => {} } }
Configuration
Customize the send with SendMessageConfiguration:
#![allow(unused)] fn main() { use a2a_protocol_sdk::types::params::SendMessageConfiguration; let params = MessageSendParams { tenant: None, message: make_message("Translate to French"), configuration: Some(SendMessageConfiguration { accepted_output_modes: vec!["text/plain".into()], task_push_notification_config: None, history_length: Some(5), // Include last 5 messages return_immediately: Some(false), // Wait for completion }), metadata: None, }; }
Continuing a Conversation
To continue a conversation, include the context_id from a previous task:
#![allow(unused)] fn main() { let first_response = client.send_message(MessageSendParams { message: Message { id: MessageId::new(uuid::Uuid::new_v4().to_string()), role: MessageRole::User, parts: vec![Part::text("Tell me about Rust")], task_id: None, context_id: None, // New conversation reference_task_ids: None, extensions: None, metadata: None, }, tenant: None, configuration: None, metadata: None, }).await?; // Get the context ID from the first response let context_id = if let SendMessageResponse::Task(task) = &first_response { Some(task.context_id.clone()) } else { None }; // Continue the conversation let follow_up = client.send_message(MessageSendParams { message: Message { id: MessageId::new(uuid::Uuid::new_v4().to_string()), role: MessageRole::User, parts: vec![Part::text("What about error handling?")], task_id: None, context_id: context_id.map(|c| ContextId::new(c.to_string())), reference_task_ids: None, extensions: None, metadata: None, }, tenant: None, configuration: None, metadata: None, }).await?; }
Multi-Part Messages
Send messages with multiple content types:
#![allow(unused)] fn main() { let message = Message { id: MessageId::new(uuid::Uuid::new_v4().to_string()), role: MessageRole::User, parts: vec![ Part::text("Analyze this image:"), Part::url("https://example.com/chart.png"), Part::data(serde_json::json!({ "analysis_type": "detailed", "language": "en" })), ], task_id: None, context_id: None, reference_task_ids: None, extensions: None, metadata: None, }; }
Next Steps
- Streaming Responses — Real-time event streams
- Task Management — Querying and canceling tasks
- Error Handling — Handling failures gracefully
Streaming Responses
For long-running tasks or when you want real-time progress, use stream_message to receive SSE events as the agent works.
Basic Streaming
#![allow(unused)] fn main() { let mut stream = client .stream_message(params) .await .expect("connect to stream"); while let Some(event) = stream.next().await { match event { Ok(StreamResponse::StatusUpdate(ev)) => { println!("Status: {:?}", ev.status.state); } Ok(StreamResponse::ArtifactUpdate(ev)) => { for part in &ev.artifact.parts { if let a2a_protocol_types::message::PartContent::Text { text } = &part.content { print!("{text}"); } } if ev.last_chunk == Some(true) { println!(); // Newline after final chunk } } Ok(StreamResponse::Task(task)) => { println!("Final: {:?}", task.status.state); } Ok(StreamResponse::Message(msg)) => { println!("Message: {:?}", msg); } Ok(_) => { // Future event types — handle gracefully } Err(e) => { eprintln!("Error: {e}"); break; } } } }
Event Ordering
A typical stream delivers events in this order:
StatusUpdate→WorkingArtifactUpdate(one or more, potentially chunked)StatusUpdate→Completed(orFailed)- Optionally, a final
Tasksnapshot
Chunked Artifacts
Artifacts can be delivered in multiple chunks:
#![allow(unused)] fn main() { Ok(StreamResponse::ArtifactUpdate(ev)) => { let is_append = ev.append.unwrap_or(false); let is_last = ev.last_chunk.unwrap_or(false); if is_append { // Append to existing artifact buffer.push_str(&extract_text(&ev.artifact)); } else { // New artifact or first chunk buffer = extract_text(&ev.artifact); } if is_last { println!("Complete artifact: {buffer}"); } } }
Re-subscribing
If a stream disconnects, re-subscribe to get the latest state:
#![allow(unused)] fn main() { let mut stream = client .subscribe_to_task("task-abc") .await?; // Continue processing events... while let Some(event) = stream.next().await { // ... } }
Stream Timeouts
The client has separate timeouts for stream connections:
#![allow(unused)] fn main() { use std::time::Duration; let client = ClientBuilder::new(url) .with_stream_connect_timeout(Duration::from_secs(15)) .build()?; }
The connect timeout applies to establishing the SSE connection. Once connected, the stream stays open until the server closes it or an error occurs.
Safety Limits
The SSE parser protects against resource exhaustion:
| Limit | Value | Purpose |
|---|---|---|
| Buffer cap | 16 MiB | Prevents OOM from oversized events |
| Connect timeout | 30s (default) | Fails fast on unreachable servers |
Next Steps
- Task Management — Querying tasks after streaming
- Error Handling — Handling stream failures
Task Management
Beyond sending messages, the client provides methods for querying, listing, and canceling tasks.
Get a Task
Retrieve a task by ID:
#![allow(unused)] fn main() { use a2a_protocol_sdk::types::params::TaskQueryParams; let task = client.get_task(TaskQueryParams { tenant: None, id: "task-abc".into(), history_length: Some(10), // Include last 10 messages }).await?; println!("Task: {} ({:?})", task.id, task.status.state); if let Some(artifacts) = &task.artifacts { println!("Artifacts: {}", artifacts.len()); } if let Some(history) = &task.history { println!("Messages: {}", history.len()); } }
List Tasks
Query tasks with filtering and pagination:
#![allow(unused)] fn main() { use a2a_protocol_sdk::types::params::ListTasksParams; let response = client.list_tasks(ListTasksParams { tenant: None, context_id: Some("ctx-123".into()), // Filter by context status: Some(TaskState::Completed), // Filter by state page_size: Some(20), // 20 per page page_token: None, // First page status_timestamp_after: None, include_artifacts: Some(true), history_length: None, }).await?; for task in &response.tasks { println!("{}: {:?}", task.id, task.status.state); } // Paginate if let Some(token) = &response.next_page_token { let next_page = client.list_tasks(ListTasksParams { page_token: Some(token.clone()), ..Default::default() }).await?; } }
Filtering Options
| Parameter | Description |
|---|---|
context_id | Tasks in a specific conversation |
status | Tasks in a specific state |
status_timestamp_after | Tasks updated after a timestamp (ISO 8601) |
page_size | Results per page (1-100, default 50) |
page_token | Cursor for the next page |
include_artifacts | Include artifact data in results |
history_length | Number of messages per task |
Cancel a Task
Request cancellation of a running task:
#![allow(unused)] fn main() { let task = client.cancel_task("task-abc").await?; println!("Task state: {:?}", task.status.state); // → Canceled (if the agent supports cancellation) }
Cancellation is cooperative — the agent's executor must implement the cancel method. If the agent doesn't support cancellation, you'll get an error response.
Cancellation States
| Current State | Can Cancel? |
|---|---|
Submitted | Yes → Canceled |
Working | Yes → Canceled (if agent supports it) |
InputRequired | Yes → Canceled |
AuthRequired | Yes → Canceled |
Completed | No (terminal state) |
Failed | No (terminal state) |
Canceled | No (already canceled) |
Rejected | No (terminal state) |
Next Steps
- Error Handling — Handling API errors
- Streaming Responses — Real-time event streams
Error Handling
a2a-rust uses a layered error model: protocol-level errors (A2aError), client errors (ClientError), and server errors (ServerError).
A2aError (Protocol Level)
Protocol errors defined by the A2A spec:
#![allow(unused)] fn main() { use a2a_protocol_sdk::types::error::{A2aError, ErrorCode}; // Common error codes ErrorCode::TaskNotFound // Task doesn't exist ErrorCode::TaskNotCancelable // Agent doesn't support cancellation ErrorCode::InvalidParams // Bad request parameters ErrorCode::MethodNotFound // Unknown method ErrorCode::InternalError // Server-side failure ErrorCode::UnsupportedOperation // Not implemented }
Handling Protocol Errors
#![allow(unused)] fn main() { match client.get_task(params).await { Ok(task) => println!("Got task: {}", task.id), Err(e) => { // Check the error type eprintln!("Error: {e}"); } } }
Client Errors
The client wraps transport and protocol errors:
#![allow(unused)] fn main() { match client.send_message(params).await { Ok(response) => { /* handle response */ } Err(e) => { // Transport errors (network, timeout, etc.) // Protocol errors (task not found, invalid params, etc.) // Parse errors (malformed response) eprintln!("Client error: {e}"); } } }
Timeout Errors
#![allow(unused)] fn main() { // Per-request timeout let client = ClientBuilder::new(url) .with_timeout(Duration::from_secs(5)) .build()?; // This will error if the agent takes longer than 5 seconds match client.send_message(params).await { Ok(response) => { /* success */ } Err(e) => { // Could be a timeout error eprintln!("Failed (possibly timeout): {e}"); } } }
Connection Errors
#![allow(unused)] fn main() { // Connection timeout let client = ClientBuilder::new(url) .with_connection_timeout(Duration::from_secs(2)) .build()?; }
Automatic Retries
Use RetryPolicy to automatically retry transient errors:
#![allow(unused)] fn main() { use a2a_protocol_client::RetryPolicy; let client = ClientBuilder::new(url) .with_retry_policy(RetryPolicy::default()) .build()?; }
You can check if an error is retryable programmatically:
#![allow(unused)] fn main() { match client.send_message(params).await { Err(e) if e.is_retryable() => println!("Transient error: {e}"), Err(e) => println!("Permanent error: {e}"), Ok(resp) => { /* ... */ } } }
Retryable errors include: Http, HttpClient, Timeout, and UnexpectedStatus with codes 429, 502, 503, or 504.
Server Errors
When building an agent, the ServerError type covers handler-level failures:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::ServerError; // Server errors are returned by RequestHandlerBuilder::build() // and by store/executor operations }
Best Practices
Don't Panic
a2a-rust never panics in library code. All fallible operations return Result. Follow the same pattern in your executors:
#![allow(unused)] fn main() { // Good: return an error return Err(A2aError::internal("processing failed".into())); // Bad: panic panic!("processing failed"); }
Executor Error Handling
In your AgentExecutor, catch errors and report them as status updates:
#![allow(unused)] fn main() { Box::pin(async move { queue.write(/* Working */).await?; match risky_operation().await { Ok(result) => { queue.write(/* ArtifactUpdate */).await?; queue.write(/* Completed */).await?; } Err(e) => { // Report failure through the protocol queue.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent { task_id: ctx.task_id.clone(), context_id: ContextId::new(ctx.context_id.clone()), status: TaskStatus { state: TaskState::Failed, message: Some(Message { id: MessageId::new(uuid::Uuid::new_v4().to_string()), role: MessageRole::Agent, parts: vec![Part::text(&e.to_string())], task_id: None, context_id: None, reference_task_ids: None, extensions: None, metadata: None, }), timestamp: None, }, metadata: None, })).await?; } } Ok(()) }) }
Stream Error Recovery
For streaming, handle errors per-event:
#![allow(unused)] fn main() { while let Some(event) = stream.next().await { match event { Ok(ev) => { /* process event */ } Err(e) => { eprintln!("Stream error: {e}"); // Decide: retry via resubscribe, or give up break; } } } }
Next Steps
- Testing Your Agent — Testing error scenarios
- Pitfalls & Lessons Learned — Common mistakes
Testing Your Agent
a2a-rust makes it easy to test agents at multiple levels: unit testing executors, integration testing with real HTTP, and property-based testing with fuzz targets.
Unit Testing Executors
Test your executor logic directly by creating a RequestContext and mock EventQueueWriter:
#![allow(unused)] fn main() { use a2a_protocol_sdk::prelude::*; use a2a_protocol_server::streaming::event_queue::new_in_memory_queue; use tokio_util::sync::CancellationToken; #[tokio::test] async fn test_calculator_executor() { let executor = CalcExecutor; // Create a writer/reader pair directly for unit testing let (writer, mut reader) = new_in_memory_queue(); // Build the request context let ctx = RequestContext { task_id: TaskId::new("test-task"), context_id: "ctx-1".into(), message: Message { id: MessageId::new("msg-1"), role: MessageRole::User, parts: vec![Part::text("3 + 5")], task_id: None, context_id: None, reference_task_ids: None, extensions: None, metadata: None, }, stored_task: None, metadata: None, cancellation_token: CancellationToken::new(), }; // Run the executor executor.execute(&ctx, &*writer).await.unwrap(); // Read events from the queue let events: Vec<_> = collect_events(&mut reader).await; // Verify: Working → ArtifactUpdate → Completed assert!(matches!(&events[0], StreamResponse::StatusUpdate(e) if e.status.state == TaskState::Working)); assert!(matches!(&events[1], StreamResponse::ArtifactUpdate(e) if extract_text(&e.artifact) == "8")); assert!(matches!(&events[2], StreamResponse::StatusUpdate(e) if e.status.state == TaskState::Completed)); } }
Integration Testing with HTTP
Test the full stack by starting a real server and using a client:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::{RequestHandlerBuilder, JsonRpcDispatcher}; use a2a_protocol_sdk::client::ClientBuilder; use std::sync::Arc; #[tokio::test] async fn test_end_to_end() { // Build handler and server let handler = Arc::new( RequestHandlerBuilder::new(CalcExecutor).build().unwrap() ); let dispatcher = Arc::new(JsonRpcDispatcher::new(handler)); let addr = start_test_server(dispatcher).await; // Build client let client = ClientBuilder::new(format!("http://{addr}")) .build() .unwrap(); // Send a message let response = client.send_message(MessageSendParams { tenant: None, message: Message { id: MessageId::new("test-msg"), role: MessageRole::User, parts: vec![Part::text("10 + 20")], task_id: None, context_id: None, reference_task_ids: None, extensions: None, metadata: None, }, configuration: None, metadata: None, }).await.unwrap(); // Verify if let SendMessageResponse::Task(task) = response { assert_eq!(task.status.state, TaskState::Completed); } else { panic!("expected task response"); } } async fn start_test_server( dispatcher: Arc<JsonRpcDispatcher>, ) -> std::net::SocketAddr { let listener = tokio::net::TcpListener::bind("127.0.0.1:0") .await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(async move { loop { let (stream, _) = listener.accept().await.unwrap(); let io = hyper_util::rt::TokioIo::new(stream); let d = Arc::clone(&dispatcher); tokio::spawn(async move { let svc = hyper::service::service_fn(move |req| { let d = Arc::clone(&d); async move { Ok::<_, std::convert::Infallible>(d.dispatch(req).await) } }); let _ = hyper_util::server::conn::auto::Builder::new( hyper_util::rt::TokioExecutor::new(), ).serve_connection(io, svc).await; }); } }); addr } }
Testing Both Transports
Run the same tests against both JSON-RPC and REST:
#![allow(unused)] fn main() { #[tokio::test] async fn test_jsonrpc_transport() { let addr = start_jsonrpc_server().await; let client = ClientBuilder::new(format!("http://{addr}")).build().unwrap(); run_test_suite(&client).await; } #[tokio::test] async fn test_rest_transport() { let addr = start_rest_server().await; let client = ClientBuilder::new(format!("http://{addr}")) .with_protocol_binding("REST") .build().unwrap(); run_test_suite(&client).await; } async fn run_test_suite(client: &A2aClient) { // Test send_message, stream_message, get_task, etc. } }
Testing Streaming
#![allow(unused)] fn main() { #[tokio::test] async fn test_streaming() { let addr = start_server().await; let client = ClientBuilder::new(format!("http://{addr}")).build().unwrap(); let mut stream = client.stream_message(params).await.unwrap(); let mut events = vec![]; while let Some(event) = stream.next().await { events.push(event.unwrap()); } // Verify event sequence assert!(events.len() >= 3); // Working + Artifact + Completed } }
Wire Format Tests
Verify JSON serialization matches the A2A spec:
#![allow(unused)] fn main() { #[test] fn task_state_wire_format() { let status = TaskStatus::new(TaskState::Completed); let json = serde_json::to_string(&status).unwrap(); assert!(json.contains("\"TASK_STATE_COMPLETED\"")); } #[test] fn message_role_wire_format() { let msg = Message { id: MessageId::new("m1"), role: MessageRole::User, parts: vec![Part::text("hi")], // ... }; let json = serde_json::to_string(&msg).unwrap(); assert!(json.contains("\"ROLE_USER\"")); assert!(json.contains("\"messageId\"")); } }
Fuzz Testing
The fuzz/ directory contains fuzz targets for JSON parsing:
# Requires nightly Rust
cd fuzz
cargo +nightly fuzz run fuzz_target
Fuzz testing helps find edge cases in JSON deserialization that unit tests miss.
Why No Single Test Type Is Enough
A key lesson from a2a-rust is that no single testing technique — not even all of them together minus one — is sufficient. Each layer catches a different class of bug, and the gaps between layers are where production incidents hide:
| Test type | What it proves | What it cannot prove |
|---|---|---|
| Unit tests | Individual functions return correct values | That calling code uses those values correctly |
| Integration tests | Components work together pairwise | Multi-hop and emergent system behavior |
| Property tests | Invariants hold for all generated inputs | That real-world inputs exercise those invariants |
| Fuzz tests | Parser doesn't crash on malformed input | Semantic correctness of valid input handling |
| E2E dogfooding | The full stack works under realistic conditions | That your assertions actually detect regressions |
| Mutation tests | Your assertions detect real code changes | Protocol-level emergent behavior |
The a2a-rust experience: After building 1,200+ unit/integration/property/fuzz tests, an exhaustive 72-test E2E dogfood suite that caught 36 real bugs across 8 passes, and achieving full green CI — mutation testing still found gaps. Tests that looked comprehensive were silently missing assertions on return values, boundary conditions, and delegation correctness. The suite was green, but mutants survived because no test verified the specific behavior being mutated.
This is why mutation testing is a required quality gate: it is the only technique that measures test effectiveness rather than test existence. Every other technique answers "does the code work?" — mutation testing answers "would the tests catch it if the code broke?"
Mutation Testing
Mutation testing is the final, critical layer of test quality assurance. While unit tests verify correctness and fuzz tests find edge cases, mutation testing answers a fundamentally different question: do your tests actually detect real bugs?
A mutant is a small, deliberate code change — replacing + with -, flipping
true to false, returning a default value instead of a computed one. If the
test suite still passes after a mutation, there is a gap: a real bug in that
exact location would go undetected.
Why This Matters at Scale
At multi-data-center deployment scales, the bugs that slip through traditional testing are precisely the kind that mutation testing catches:
- Off-by-one errors in pagination, retry logic, and timeout calculations
- Swapped operands in status comparisons (e.g.,
==vs!=on task state) - Missing boundary checks where a default return looks plausible
- Dead code paths where a branch is never exercised by any test
These are the subtle, semantic correctness issues that only manifest under load, across network partitions, or during multi-hop agent orchestration — exactly the conditions that are hardest to reproduce in staging.
What Mutation Testing Found in a2a-rust
Even with 1,255 passing tests, 72 E2E dogfood tests, property tests, and fuzz targets — all green — the first mutation testing run surfaced gaps across every crate:
- Delegation methods returning
()instead of forwarding calls (e.g.,Arc<T>metrics delegation, OTel instrument recording) - Hash function correctness — replacing
^=with|=in FNV-1a was undetected because no test verified specific hash values - Date arithmetic — swapping
/with%or*in HTTP date formatting was undetected because the only test used epoch (where all fields are 0) - Rate limiter logic — replacing
>with>=in window checks,&&with||in cleanup conditions, and/with%in window calculations - Builder patterns —
builder()returningDefault::default()instead of a functional builder was undetected because existing tests used the built result without verifying builder-specific behavior - Debug formatting —
fmtreturningOk(Default::default())(empty string) instead of the real debug output - Cancellation token —
is_cancelled()returning a hardcodedtrueorfalseinstead of delegating to the actual token
Every one of these mutations represents a real bug that could have been introduced without any test catching it. The fix in each case was straightforward: add a test that asserts the specific behavior.
Running Mutation Tests
# Install cargo-mutants (one-time setup)
cargo install cargo-mutants
# Full mutation sweep (all library crates)
cargo mutants --workspace
# Test a specific crate
cargo mutants -p a2a-protocol-types
# Test a specific file
cargo mutants --file crates/a2a-types/src/task.rs
# Dry-run: list all mutants without running tests
cargo mutants --list --workspace
Configuration
Mutation testing is configured via mutants.toml at the workspace root:
# Which files to mutate
examine_globs = [
"crates/a2a-types/src/**/*.rs",
"crates/a2a-client/src/**/*.rs",
"crates/a2a-server/src/**/*.rs",
"crates/a2a-sdk/src/**/*.rs",
]
# Skip unproductive mutations (re-exports, generated code, formatting)
exclude_globs = ["**/mod.rs", "crates/*/src/proto/**"]
exclude_re = ["fmt$", "^tracing::", "^log::"]
CI Integration
- Nightly: A full mutation sweep runs every night. Any surviving mutant fails the build and is reported as a CI artifact.
- PR gate: An incremental sweep runs on changed files only, so PR feedback is fast while still enforcing zero surviving mutants on new/modified code.
Interpreting Results
Found 247 mutants to test
247 caught ✓ # Test suite detected the mutation
0 missed ✗ # ALERT: test gap — add or strengthen tests
3 unviable ⊘ # Mutation caused compile error (not a gap)
- Caught: The test suite correctly detected the mutation. Good.
- Missed: A real bug in this location would go undetected. Add tests.
- Unviable: The mutation produced a compile error. Not a test gap.
Target: 100% mutation score (zero missed mutants across all library crates).
Fixing Surviving Mutants
When a mutant survives, cargo mutants prints the exact source location and
mutation. For example:
MISSED: crates/a2a-types/src/task.rs:42: replace TaskState::is_terminal -> bool with false
This tells you that replacing the body of is_terminal() with false did not
cause any test to fail. The fix is to add a test that asserts is_terminal()
returns true for terminal states.
Running the Test Suite
Current status: The workspace has 1,255 passing tests across all crates (unit, integration, property, fuzz, and E2E dogfood).
# All tests
cargo test --workspace
# Specific crate
cargo test -p a2a-protocol-server
# With output
cargo test --workspace -- --nocapture
# Specific test
cargo test test_calculator_executor
Next Steps
- Production Hardening — Preparing for deployment
- Pitfalls & Lessons Learned — Common testing mistakes
Dogfooding: The Agent Team Example
The best way to find bugs in an SDK is to use it yourself — under real conditions, with real complexity, exercising real interaction patterns. Unit tests verify individual functions. Integration tests verify pairwise contracts. But only dogfooding reveals the emergent issues that appear when all the pieces come together.
The agent-team example (examples/agent-team/) is a full-stack dogfood of every a2a-rust capability. It deploys 4 specialized agents that discover each other, delegate work, stream results, and report health — all via the A2A protocol. A comprehensive test suite of 72 E2E tests (79 with optional transports and signing) runs in ~2.5 seconds.
Why Dogfood?
Unit tests and integration tests are necessary but insufficient. No single test type is enough — it requires an ensemble of all types to catch everything. Here's what each layer catches and misses:
| Testing level | What it catches | What it misses |
|---|---|---|
| Unit tests | Logic errors in isolated functions | Interaction bugs, serialization mismatches |
| Integration tests | Pairwise contracts between components | Multi-hop communication, emergent behavior |
| Property tests | Edge cases in data handling | Protocol flow issues, lifecycle bugs |
| Fuzz tests | Malformed input handling | Semantic correctness of valid flows |
| Dogfooding | DX issues, multi-hop bugs, performance surprises, missing features | Weak assertions, dead code paths |
| Mutation tests | Weak/missing assertions, dead code paths, off-by-one errors, swapped operands | Protocol-level emergent behavior |
The critical lesson: After building 1,255 tests across all of the above categories — unit, integration, property, fuzz, and 72 E2E dogfood tests that caught 36 real bugs — the entire suite was green. Every CI check passed. Then we ran mutation testing, and it found gaps in every crate. Tests that looked comprehensive were silently missing assertions on return values, boundary conditions, delegation correctness, and hash function specifics.
Mutation testing fills the gap between "tests pass" and "tests actually detect bugs." A test suite with 100% line coverage can still have a 0% mutation score if every assertion is trivial. Mutation testing is the only technique that directly measures test effectiveness rather than test existence. See Testing Your Agent — Mutation Testing for setup and usage.
Dogfooding operates at the highest level of the testing pyramid. It catches the class of bugs that live in the seams between components — bugs that only manifest when a real application exercises the full stack in realistic patterns. But even dogfooding cannot verify that your assertions are strong enough to catch regressions — that's where mutation testing completes the picture.
The Agent Team Architecture
┌─────────────────────────────────────────────────────────────┐
│ E2E Test Harness │
│ (72 tests, ~2500ms total) │
└─────┬───────────┬───────────┬───────────┬───────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Code │ │ Build │ │ Health │ │ Coordin- │
│ Analyzer │ │ Monitor │ │ Monitor │ │ ator │
│ JSON-RPC │ │ REST │ │ JSON-RPC │ │ REST │
└──────────┘ └──────────┘ └──────────┘ └─────┬────┘
│ A2A
┌──────────────────┼──────────────┐
│ │ │
▼ ▼ ▼
CodeAnalyzer BuildMonitor HealthMonitor
(send_message) (send_message) (send_message)
│
┌────┴────┐
▼ ▼
list_tasks list_tasks
(all agents) (all agents)
Each agent exercises different SDK capabilities:
| Agent | Transport | Capabilities Exercised |
|---|---|---|
| CodeAnalyzer | JSON-RPC | Streaming artifacts with append mode, multi-part output (text + JSON data), cancellation token checking |
| BuildMonitor | REST | Full task lifecycle (Completed/Failed/Canceled), streaming phase output, cancel executor override, push notification support |
| HealthMonitor | JSON-RPC | Multi-part input (text + data), agent-to-agent discovery via list_tasks, push notification support |
| Coordinator | REST | A2A client calls to other agents, result aggregation, multi-level orchestration |
SDK Features Exercised
The agent team exercises 35+ distinct SDK features in a single run:
AgentExecutortrait (4 implementations)RequestHandlerBuilder(all options: timeout, queue capacity, max streams, metrics, interceptors)JsonRpcDispatcherandRestDispatcherWebSocketDispatcher(withwebsocketfeature flag)ClientBuilder(both JSON-RPC and REST protocol bindings)- Synchronous
SendMessageand streamingSendStreamingMessage EventStreamconsumer (SSE event loop)GetTaskandListTaskswith pagination and status filteringCancelTaskwith custom executor override- Push notification config CRUD (
set_push_config,list_push_configs) HttpPushSenderdelivery with webhook receiverServerInterceptor(audit logging + bearer token auth checking)- Custom
Metricsobserver (request/response/error counting) AgentCarddiscovery (both transports)- Multi-part messages (
Part::text+Part::data) - Artifact streaming with
appendandlast_chunkflags - All
TaskStatetransitions (Submitted, Working, Completed, Failed, Canceled) CancellationTokencooperative checkingreturn_immediatelymode- Request metadata passthrough
- Context ID continuation across messages
- Concurrent GetTask during active streams
SubscribeToTaskresubscribe (both REST and JSON-RPC)boxed_futureandEventEmitterhelpers- Concurrent streams on same agent
history_lengthconfigurationPart::file_bytes(binary file content)TenantAwareInMemoryTaskStoreisolationTenantContext::scopetask-local threading- WebSocket transport (
SendMessage+ streaming) — withwebsocketfeature - Batch JSON-RPC (single, multi, empty, mixed, streaming rejection)
- Real auth rejection (interceptor short-circuit)
GetExtendedAgentCardvia JSON-RPCDynamicAgentCardHandler(runtime-generated cards)- Agent card HTTP caching (ETag + 304 Not Modified)
- Backpressure / lagged event queue (capacity=2)
Modular Example Structure
examples/agent-team/src/
├── main.rs # Thin orchestrator (~400 lines)
├── executors/
│ ├── mod.rs # Re-exports
│ ├── code_analyzer.rs # CodeAnalyzer executor
│ ├── build_monitor.rs # BuildMonitor executor
│ ├── health_monitor.rs # HealthMonitor executor
│ └── coordinator.rs # Coordinator executor (A2A client calls)
├── cards.rs # Agent card builders
├── helpers.rs # Shared helpers (make_send_params, EventEmitter)
├── infrastructure.rs # Metrics, interceptors, webhook, server setup
└── tests/
├── mod.rs # TestResult, TestContext
├── basic.rs # Tests 1-10: core send/stream paths
├── lifecycle.rs # Tests 11-20: orchestration, cancel, agent cards
├── edge_cases.rs # Tests 21-30: errors, concurrency, metrics
├── stress.rs # Tests 31-40: stress, durability, event ordering
├── dogfood.rs # Tests 41-50: SDK gaps, regressions, edge cases
├── transport.rs # Tests 51-58: WebSocket, gRPC, multi-tenancy
└── coverage_gaps.rs # Tests 61-79: Batch JSON-RPC, auth, cards, caching, backpressure, signing
Running the Agent Team
# Basic run (all output to stdout)
cargo run -p agent-team
# With WebSocket tests (tests 51-52)
cargo run -p agent-team --features websocket
# With structured logging
RUST_LOG=debug cargo run -p agent-team --features tracing
# With all optional features
cargo run -p agent-team --features "websocket,tracing"
Expected output:
╔══════════════════════════════════════════════════════════════╗
║ A2A Agent Team — Full SDK Dogfood & E2E Test Suite ║
╚══════════════════════════════════════════════════════════════╝
Agent [CodeAnalyzer] JSON-RPC on http://127.0.0.1:XXXXX
Agent [BuildMonitor] REST on http://127.0.0.1:XXXXX
Agent [HealthMonitor] JSON-RPC on http://127.0.0.1:XXXXX
Agent [Coordinator] REST on http://127.0.0.1:XXXXX
...72 tests...
║ Total: 72 | Passed: 72 | Failed: 0 | Time: ~2500ms
Lessons for Your Own Agents
- Test all four transports. JSON-RPC, REST, WebSocket, and gRPC have different serialization and framing paths. A bug in one may not exist in the others.
- Test multi-hop flows. Agent A calling Agent B is different from a client calling Agent A. The interaction patterns surface different bugs.
- Test failure paths explicitly. The agent team tests
TaskState::FailedandTaskState::CanceledalongsideCompleted. Happy-path-only testing misses lifecycle bugs. - Use real metrics and interceptors. They exercise code paths that exist in the handler but are invisible to pure request/response tests.
- Deploy multiple agents simultaneously. Concurrent servers with different configurations stress connection pooling, port binding, and resource cleanup in ways single-server tests cannot.
- Test
return_immediatelymode. Client config must actually propagate to the server — this was a real bug caught only by dogfooding. - Test tenant isolation. Multi-tenancy bugs are subtle — same task IDs in different tenants should not collide.
Open Issues & Future Work — All Resolved
All architecture, ergonomics, observability, performance, and durability issues from passes 1–8 have been resolved (36 bugs across 8 passes). All proposed beyond-spec features have been implemented:
| Feature | Location |
|---|---|
| OpenTelemetry integration | crates/a2a-server/src/otel/ — OtelMetrics, OtelMetricsBuilder, init_otlp_pipeline (otel feature) |
| Connection pooling metrics | crates/a2a-server/src/metrics.rs — ConnectionPoolStats, on_connection_pool_stats |
| Hot-reload agent cards | crates/a2a-server/src/agent_card/hot_reload.rs — file polling + SIGHUP reload |
| Store migration tooling | crates/a2a-server/src/store/migration.rs — MigrationRunner, V1–V3 migrations |
| Per-tenant configuration | crates/a2a-server/src/tenant_config.rs — PerTenantConfig, TenantLimits |
| TenantResolver trait | crates/a2a-server/src/tenant_resolver.rs — header, bearer token, path segment strategies |
| Agent card signing E2E | examples/agent-team/src/tests/coverage_gaps.rs — test 79 (signing feature) |
Sub-pages
- Bugs Found & Fixed — All 36 bugs discovered across eight dogfooding passes
- Test Coverage Matrix — Complete 72-test E2E coverage map (79 with optional transports and signing)
See Also
- Testing Your Agent — Unit and integration testing patterns
- Production Hardening — Preparing for deployment
- Pitfalls & Lessons Learned — Common mistakes
Dogfooding: Bugs Found & Fixed
Eight dogfooding passes across v0.1.0 and v0.2.0 uncovered 36 real bugs that 1,255 unit tests, integration tests, property tests, and fuzz tests did not catch. All 36 have been fixed.
Summary
| Pass | Focus | Bugs | Severity |
|---|---|---|---|
| Pass 1 | Initial dogfood | 3 | 2 Medium, 1 Low |
| Pass 2 | Hardening audit | 6 | 1 High, 2 Medium, 3 Low |
| Pass 3 | Stress testing | 1 | 1 High |
| Pass 4 | SDK regressions | 3 | 2 Critical, 1 Medium |
| Pass 5 | Concurrency | 4 | 2 High, 1 Medium, 1 Low |
| Pass 6 | Architecture | 5 | 1 Critical, 1 High, 3 Medium |
| Pass 7 | Deep dogfood | 9 | 1 Critical, 2 High, 4 Medium, 2 Low |
| Pass 8 | Performance & security | 5 | 1 Critical, 2 Medium, 1 Medium, 1 Low |
By Severity
| Severity | Count | Examples |
|---|---|---|
| Critical | 5 | Timeout retry broken (#32), push config DoS (#26), placeholder URLs (#11, #12, #18) |
| High | 6 | Concurrent SSE (#9), return_immediately ignored (#10), TOCTOU race (#15), SSRF bypass (#25) |
| Medium | 16 | REST field stripping (#1), query encoding (#19), path traversal (#35) |
| Low | 9 | Metrics hooks (#2, #6, #7), gRPC error context (#36) |
Configuration Hardening
Extracted all hardcoded constants into configurable structs during passes 2-7:
| Struct | Fields | Where |
|---|---|---|
DispatchConfig | max_request_body_size, body_read_timeout, max_query_string_length | Both dispatchers |
PushRetryPolicy | max_attempts, backoff | HttpPushSender |
HandlerLimits | max_id_length, max_metadata_size, max_cancellation_tokens, max_token_age | RequestHandler |
Pass 1: Initial Dogfood (3 bugs)
Bug 1: REST Transport Strips Required Fields from Push Config Body
Severity: Medium | Component: Client REST transport + Server dispatch
The client's REST transport extracts path parameters from the serialized JSON body to interpolate URL templates. For CreateTaskPushNotificationConfig, the route is /tasks/{taskId}/pushNotificationConfigs, so the transport extracts taskId from the body and removes it. But the server handler requires taskId in the body.
Why tests missed it: Unit tests test client transport and server dispatch in isolation. The bug only appears when they interact.
Fix: Server-side handle_set_push_config injects taskId from the URL path back into the body before parsing.
Bug 2: on_response Metrics Hook Never Called
Severity: Low | Component: RequestHandler
Metrics::on_response showed 0 responses after 17 successful requests. The hook was defined but never called in any handler method.
Fix: Added on_request()/on_response() calls to all 10 handler methods.
Bug 3: Protocol Binding Mismatch Produces Confusing Errors
Severity: Low | Component: Client JSON-RPC transport
When a JSON-RPC client called a REST-only server, the error was an opaque parsing failure rather than "wrong protocol binding."
Fix: (1) New ClientError::ProtocolBindingMismatch variant, (2) JSON-RPC transport detects non-JSON-RPC responses, (3) HealthMonitor uses agent card discovery to select correct transport.
Pass 2: Hardening Audit (6 bugs)
Bug 4: list_push_configs REST Response Format Mismatch
Severity: Medium | Component: Both dispatchers
Dispatchers serialized results as raw Vec<TaskPushNotificationConfig>, but the client expected ListPushConfigsResponse { configs, next_page_token }.
Fix: Both dispatchers wrap results in ListPushConfigsResponse.
Bug 5: Push Notification Test Task ID Mismatch
Severity: Medium | Component: Test design
Push config was registered on Task A, but a subsequent send_message created Task B. No config existed for Task B.
Fix: Restructured as push config CRUD lifecycle test.
Bug 6: on_error Metrics Hook Never Fired
Severity: Low | Component: RequestHandler
All handler error paths used ? to propagate without invoking on_error.
Fix: All 10 handler methods restructured with async block + match on on_response/on_error.
Bug 7: on_queue_depth_change Metrics Hook Never Fired
Severity: Low | Component: EventQueueManager
EventQueueManager had no access to the Metrics object.
Fix: Added Arc<dyn Metrics> to EventQueueManager, wired from builder.
Bug 8: JsonRpcDispatcher Does Not Serve Agent Cards
Severity: Medium | Component: JsonRpcDispatcher
resolve_agent_card() failed for JSON-RPC agents because only RestDispatcher served /.well-known/agent.json.
Fix: Added StaticAgentCardHandler to JsonRpcDispatcher.
Bug 9: SubscribeToTask Fails with Concurrent SSE Streams
Severity: High | Component: EventQueueManager
mpsc channels allow only a single reader. Once stream_message took the reader, subscribe_to_task failed with "no active event queue."
Fix: Redesigned from mpsc to tokio::sync::broadcast channels. subscribe() creates additional readers from the same sender.
Pass 3: Stress Testing (1 bug)
Bug 10: Client Ignores return_immediately Config
Severity: High | Component: Client send_message()
ClientBuilder::with_return_immediately(true) stored the flag in ClientConfig but send_message() never injected it into MessageSendParams.configuration. The server never saw the flag, so tasks always ran to completion.
Why tests missed it: The server-side return_immediately logic was correct. The bug was in client-to-server config propagation — a seam that only E2E testing exercises.
Fix: Added apply_client_config() that merges client-level return_immediately, history_length, and accepted_output_modes into params before sending. Per-request values take precedence over client defaults.
Pass 4: SDK Regression Testing (3 bugs)
Bug 11: JSON-RPC ListTaskPushNotificationConfigs Param Type Mismatch
Severity: Critical | Component: JsonRpcDispatcher
The JSON-RPC dispatcher parsed ListTaskPushNotificationConfigs params as TaskIdParams (field id), but the client sends ListPushConfigsParams (field task_id). This caused silent deserialization failure — push config listing via JSON-RPC was completely broken. REST worked because it uses path-based routing.
Why tests missed it: Previous push config tests used REST transport or tested create/get/delete but not list. The JSON-RPC list path was never exercised end-to-end.
Fix: Changed parse_params::<TaskIdParams> to parse_params::<ListPushConfigsParams> and p.id to p.task_id in jsonrpc.rs.
Bug 12: Agent Card URLs Set to "http://placeholder"
Severity: Critical | Component: Agent-team example
Agent cards were constructed with code_analyzer_card("http://placeholder") before the server bound to a port. The actual address was only known after TcpListener::bind(). This meant /.well-known/agent.json served a card with a URL that didn't match the actual server address.
Why tests missed it: Tests used URLs from TestContext (the real bound addresses), not from the agent card. Only resolve_agent_card() tests would have caught this, and those didn't exist.
Fix: Introduced bind_listener() that pre-binds TCP listeners to get addresses before handler construction. Cards are now built with correct URLs.
Bug 13: Push Notification Event Classification Broken
Severity: Medium | Component: Agent-team webhook receiver
The webhook receiver classified events by checking value.get("status") and value.get("artifact"), but StreamResponse serializes as {"statusUpdate": {...}} / {"artifactUpdate": {...}} (camelCase variant names). All push events were classified as "Unknown".
Fix: Check statusUpdate/artifactUpdate/task instead.
Pass 5: Hardening & Concurrency Audit (4 bugs)
Bug 14: Lock Poisoning Silently Masked in InMemoryCredentialsStore
Severity: High | Component: InMemoryCredentialsStore
All three CredentialsStore methods (get, set, remove) used .ok()? or if let Ok(...) to silently ignore RwLock poisoning. If a thread panicked while holding the lock, subsequent calls would return None (for get) or silently skip the operation (for set/remove), masking the underlying bug.
Why tests missed it: Tests don't exercise lock poisoning because #[test] functions that panic abort the test, not the lock.
Fix: Changed all three methods to .expect("credentials store lock poisoned") for fail-fast behavior. Added documentation explaining the poisoning semantics.
Bug 15: Rate Limiter TOCTOU Race on Window Advance
Severity: High | Component: RateLimitInterceptor
When two concurrent requests arrived at a window boundary, both could see the old window_start, and both would store count = 1 for the new window. This let 2N requests through per window instead of N.
The race: Thread A loads window_start (old), Thread B loads window_start (old), Thread A stores new window_start + count=1, Thread B stores new window_start + count=1 (clobbering A's count).
Why tests missed it: The single-threaded test executor doesn't interleave atomic operations. The race only manifests under real concurrent load.
Fix: Replaced the non-atomic load-check-store sequence with a compare_exchange (CAS) loop. Only one thread wins the CAS to reset the window; others retry and see the updated window on the next iteration.
Bug 16: Rate Limiter Unbounded Bucket Growth
Severity: Medium | Component: RateLimitInterceptor
The buckets HashMap grew without bound. Each unique caller key created a CallerBucket that was never removed, even after the caller departed. In a service with many transient callers (e.g., serverless functions), this would leak memory indefinitely.
Why tests missed it: Tests use a small fixed set of callers. The leak only manifests with high caller churn over time.
Fix: Added amortized stale-bucket cleanup (every 256 check() calls). Buckets whose window_start is more than one window old are evicted.
Bug 17: No Protocol Version Compatibility Warning
Severity: Low | Component: ClientBuilder
ClientBuilder::from_card() silently accepted any protocol_version from the agent card, including incompatible versions (e.g., "2.0.0" when the client supports "1.x"). Users would only discover the mismatch through obscure deserialization errors.
Fix: Added protocol version major-version check in from_card(). When the agent's major version differs from the client's supported version, a tracing::warn! is emitted.
Pass 6: Architecture Audit (5 bugs)
Bug 18: gRPC Agent Card Still Uses Placeholder URL
Severity: Critical | Component: Agent-team example (gRPC path)
The gRPC CodeAnalyzer agent was still constructed with grpc_analyzer_card("http://placeholder") — the exact same Bug #12 pattern that was fixed for HTTP agents in Pass 4. The gRPC path was missed because it was behind a #[cfg(feature = "grpc")] gate.
Why tests missed it: gRPC tests used the address from serve_grpc() return value, not from the agent card. Agent card discovery tests only ran for HTTP agents.
Fix: Added GrpcDispatcher::serve_with_listener() to accept a pre-bound TcpListener. The agent-team example now pre-binds for gRPC the same way it does for HTTP, ensuring the agent card URL is correct from the start.
Bug 19: REST Transport Query Strings Not URL-Encoded
Severity: High | Component: Client REST transport
build_query_string() concatenated parameter values directly into query strings without percent-encoding. Values containing &, =, spaces, or other special characters would corrupt the query string, causing server-side parsing failures or parameter injection.
Why tests missed it: All test parameters used simple alphanumeric values that don't need encoding.
Fix: Added encode_query_value() implementing RFC 3986 percent-encoding for all non-unreserved characters. Both keys and values are now encoded.
Bug 20: WebSocket Stream Termination Uses Fragile String Matching
Severity: Medium | Component: Client WebSocket transport
The WebSocket stream reader detected stream completion by checking text.contains("stream_complete"). This would false-positive on any payload containing that substring (e.g., a task whose output mentions "stream_complete") and would miss terminal status updates that don't contain that exact string.
Fix: Replaced with is_stream_terminal() that deserializes the JSON-RPC frame and checks for terminal task states (completed, failed, canceled, rejected) or the stream_complete sentinel in the result object.
Bug 21: Background Event Processor Silently Drops Store Write Failures
Severity: High | Component: Server RequestHandler background processor
In streaming mode, the background event processor called let _ = task_store.save(...).await; at 5 call sites, silently discarding any store errors. If the store failed (disk full, permission denied), task state would diverge: the event queue showed completion but the persistent store didn't record it.
Why tests missed it: In-memory stores don't fail. The bug only manifests with persistent backends under storage pressure.
Fix: All 5 sites now use if let Err(_e) = task_store.save(...).await { trace_error!(...) } to surface failures via structured logging.
Bug 22: Metadata Size Validation Bypass via Serialization Failure
Severity: Medium | Component: Server RequestHandler messaging
Metadata size was measured with serde_json::to_string(meta).map(|s| s.len()).unwrap_or(0). If serialization failed, the size defaulted to 0, bypassing the size limit entirely.
Fix: Changed unwrap_or(0) to map_err(|_| ServerError::InvalidParams("metadata is not serializable")), rejecting unserializable metadata outright.
Pass 7: Deep Dogfood (9 bugs)
Bug 23: Graceful Shutdown Hangs on Executor Cleanup
Severity: High | Component: RequestHandler
shutdown() called executor.on_shutdown().await with no timeout. If an executor's cleanup routine blocked indefinitely, the entire shutdown process would hang.
Fix: Both shutdown() and shutdown_with_timeout() now bound the executor cleanup call with a timeout (10 seconds for shutdown(), the provided timeout for shutdown_with_timeout()).
Bug 24: Push Notification Body Clone Per Retry Attempt
Severity: Medium | Component: HttpPushSender
body_bytes.clone() inside the retry loop allocated a full copy of the serialized event body for every retry attempt. For large events with 3 retries, this caused 3 unnecessary heap allocations.
Fix: Changed body_bytes from Vec<u8> to Bytes (reference-counted). Clone is now an atomic reference count increment instead of a memory copy.
Bug 25: Webhook URL Missing Scheme Validation
Severity: High | Component: HttpPushSender
validate_webhook_url() checked for private IPs and hostnames but did not validate the URL scheme. URLs like ftp://evil.com/hook or file:///etc/passwd bypassed all SSRF validation.
Fix: Added explicit scheme check requiring http:// or https://. Unknown schemes and schemeless URLs are now rejected with a descriptive error.
Bug 26: Push Config Store Unbounded Global Growth (DoS)
Severity: Critical | Component: InMemoryPushConfigStore
The per-task config limit (100) prevented excessive configs per task, but there was no global limit. An attacker could create millions of tasks with 100 configs each, exhausting memory.
Fix: Added max_total_configs field (default 100,000) with with_max_total_configs() builder. The global check runs before the per-task check in set().
Bug 27: gRPC Error Code Mapping Incomplete
Severity: Medium | Component: Client gRPC transport
Only 4 tonic status codes were mapped to A2A error codes. Unauthenticated, PermissionDenied, ResourceExhausted, DeadlineExceeded, Cancelled, and Unavailable all silently mapped to InternalError, losing semantic information.
Fix: Added explicit mappings for 6 additional gRPC status codes.
Bug 28: BuildMonitor Cancel Race Condition
Severity: Medium | Component: Agent-team example
BuildMonitorExecutor::cancel() unconditionally emitted TaskState::Canceled without checking if the task had already reached a terminal state. If the executor completed between the handler checking and the cancel arriving, this caused an invalid Completed → Canceled transition.
Fix: Added ctx.cancellation_token.is_cancelled() guard before emitting cancel status.
Bug 29: CodeAnalyzer Missing Cancellation Re-Check
Severity: Low | Component: Agent-team example
CodeAnalyzerExecutor only checked cancellation once (before artifact emission). Multiple artifact emissions happened without re-checking, meaning cancellation between artifacts was delayed.
Fix: Added cancellation re-check between the two artifact emission phases.
Bug 30: Accept Loop Breaks Kill Servers
Severity: Medium | Component: Agent-team infrastructure
All three server startup functions (serve_jsonrpc, serve_rest, start_webhook_server) used Err(_) => break in their accept loops. A single transient accept error (e.g., EMFILE) would permanently kill the server.
Fix: Changed to Err(_) => continue for JSON-RPC and REST servers, and eprintln! + continue for the webhook receiver.
Bug 31: Coordinator Silent Client Build Failure
Severity: Low | Component: Agent-team example
When ClientBuilder::build() failed for an agent URL, the error was silently discarded (if let Ok(client) = ...). If all agents failed, the coordinator would run with an empty client map, producing confusing "Unknown command" errors.
Fix: Changed to match with Err(e) => eprintln!(...) to log the failing agent name, URL, and error.
Pass 8: Deep Dogfood (5 bugs)
Bug 32: Timeout Errors Misclassified as Transport (CRITICAL)
Severity: Critical | Component: Client REST + JSON-RPC transports
Both RestTransport::execute_request() and JsonRpcTransport::execute_request() mapped tokio::time::timeout errors to ClientError::Transport. Since Transport is explicitly marked non-retryable in is_retryable(), timeouts never triggered retry logic — defeating the entire retry system for the most common transient failure mode.
Why tests missed it: Unit tests checked that timeouts produced errors, but never checked the error variant. The retry integration tests used simulated errors, not real timeouts.
Fix: Changed both transports to use ClientError::Timeout("request timed out"). Added exhaustive retryability classification tests.
Bug 33: SSE Parser O(n) Dequeue Performance
Severity: Medium | Component: Client SSE parser
SseParser::next_frame() used Vec::remove(0) which is O(n) because it shifts all remaining elements. With high-throughput streaming (hundreds of events per second), this creates quadratic overhead.
Why tests missed it: Unit tests parse small event counts (1-3 events). The performance issue only manifests with large event queues.
Fix: Replaced Vec<Result<SseFrame, SseParseError>> with VecDeque for O(1) pop_front().
Bug 34: SSE Parser Silent UTF-8 Data Loss
Severity: Medium | Component: Client SSE parser
Malformed UTF-8 lines were silently discarded (return on from_utf8 failure). When a multi-byte UTF-8 character spans a TCP chunk boundary, the trailing bytes can appear invalid. The entire line would be dropped, causing silent data loss.
Why tests missed it: All test inputs use ASCII. The bug only manifests with non-ASCII content delivered across TCP fragment boundaries.
Fix: Changed to String::from_utf8_lossy() which replaces invalid bytes with U+FFFD instead of dropping the entire line.
Bug 35: Double-Encoded Path Traversal Bypass
Severity: Medium | Component: Server REST dispatcher
contains_path_traversal() only decoded one level of percent-encoding. An attacker could use %252E%252E (which decodes to %2E%2E, then to ..) to bypass the check.
Why tests missed it: No test used double-encoded inputs. The existing test only checked raw .. sequences.
Fix: Added a second decoding pass. Added tests for raw, single-encoded, and double-encoded path traversal.
Bug 36: gRPC Stream Errors Lose Error Context
Severity: Low | Component: Client gRPC transport
grpc_stream_reader_task mapped gRPC stream errors to generic ClientError::Transport(format!("gRPC stream error: {}", status.message())) instead of using the existing grpc_code_to_error_code() function. This lost the structured error code information (NotFound, InvalidArgument, etc.).
Why tests missed it: gRPC streaming tests check for successful completion, not error paths within the stream.
Fix: Changed to use ClientError::Protocol(A2aError::new(grpc_code_to_error_code(...), ...)) for proper error classification.
Dogfooding: Test Coverage Matrix
The agent team runs 72 E2E tests across 8 test modules (79 with optional transports and signing). All tests pass in ~2.5 seconds.
Tests 1-10: Core Paths (basic.rs)
| # | Test | Transport | What it exercises |
|---|---|---|---|
| 1 | sync-jsonrpc-send | JSON-RPC | Synchronous SendMessage, artifact count |
| 2 | streaming-jsonrpc | JSON-RPC | SSE streaming, event ordering, artifact metadata |
| 3 | sync-rest-send | REST | REST synchronous path |
| 4 | streaming-rest | REST | REST SSE streaming |
| 5 | build-failure-path | REST | TaskState::Failed lifecycle |
| 6 | get-task | JSON-RPC | GetTask retrieval by ID |
| 7 | list-tasks | JSON-RPC | ListTasks with pagination token |
| 8 | push-config-crud | REST | Push config create/get/list/delete/verify lifecycle |
| 9 | multi-part-message | JSON-RPC | Text + data parts, HealthMonitor agent-to-agent |
| 10 | agent-to-agent | REST | Coordinator delegates to CodeAnalyzer |
Tests 11-20: Lifecycle (lifecycle.rs)
| # | Test | Transport | What it exercises |
|---|---|---|---|
| 11 | full-orchestration | REST | Coordinator -> CodeAnalyzer + BuildMonitor fan-out |
| 12 | health-orchestration | REST | Coordinator -> HealthMonitor -> all agents (3-level) |
| 13 | message-metadata | JSON-RPC | Request metadata passthrough |
| 14 | cancel-task | REST | Mid-stream cancellation via CancelTask |
| 15 | get-nonexistent-task | JSON-RPC | Error: TaskNotFound (-32001) |
| 16 | pagination-walk | JSON-RPC | ListTasks page_size=1, no duplicates |
| 17 | agent-card-discovery | REST | resolve_agent_card on REST endpoint |
| 18 | agent-card-jsonrpc | JSON-RPC | resolve_agent_card on JSON-RPC endpoint |
| 19 | push-not-supported | JSON-RPC | Error: NotSupported (-32003) for no-push agent |
| 20 | cancel-completed | REST | Error: InvalidState for completed task cancel |
Tests 21-30: Edge Cases (edge_cases.rs)
| # | Test | Transport | What it exercises |
|---|---|---|---|
| 21 | cancel-nonexistent | REST | Error: TaskNotFound for fake task ID |
| 22 | return-immediately | REST | return_immediately client config propagation |
| 23 | concurrent-requests | JSON-RPC | 5 parallel requests to same agent |
| 24 | empty-parts-rejected | JSON-RPC | Validation: empty message parts |
| 25 | get-task-rest | REST | GetTask via REST transport |
| 26 | list-tasks-rest | REST | ListTasks via REST transport |
| 27 | push-crud-jsonrpc | JSON-RPC | Push config CRUD via JSON-RPC (HealthMonitor) |
| 28 | resubscribe-rest | REST | SubscribeToTask concurrent resubscription |
| 29 | metrics-nonzero | — | All 4 agents have non-zero request counts |
| 30 | error-metrics-tracked | JSON-RPC | Error metric increments on invalid request |
Tests 31-40: Stress & Durability (stress.rs)
| # | Test | Transport | What it exercises |
|---|---|---|---|
| 31 | high-concurrency | JSON-RPC | 20 parallel requests to same agent |
| 32 | mixed-transport | Both | REST + JSON-RPC simultaneously |
| 33 | context-continuation | JSON-RPC | Two messages with same context_id |
| 34 | large-payload | JSON-RPC | 64KB text payload processing |
| 35 | stream-with-get-task | REST | GetTask during active SSE stream |
| 36 | push-delivery-e2e | REST | Push config set during streaming task |
| 37 | list-status-filter | JSON-RPC | ListTasks with TaskState::Completed filter |
| 38 | store-durability | JSON-RPC | Create 5 tasks, retrieve all 5 |
| 39 | queue-depth-metrics | — | Cumulative metrics tracking (>20 total requests) |
| 40 | event-ordering | JSON-RPC | Working -> artifacts -> Completed sequence |
Tests 41-50: SDK Dogfood Regressions (dogfood.rs)
| # | Test | Transport | What it exercises |
|---|---|---|---|
| 41 | card-url-correct | JSON-RPC | Agent card URL matches actual bound address |
| 42 | card-skills-valid | Both | All 4 agent cards have name, description, version, skills, interface |
| 43 | push-list-regression | JSON-RPC | ListTaskPushNotificationConfigs via JSON-RPC (regression for bug 11) |
| 44 | push-event-classify | REST | Webhook event classifier uses correct field names |
| 45 | resubscribe-jsonrpc | JSON-RPC | SubscribeToTask via JSON-RPC transport |
| 46 | multiple-artifacts | JSON-RPC | Multiple artifacts per task (>=2) |
| 47 | concurrent-streams | REST | 5 parallel SSE streams on same agent |
| 48 | list-context-filter | JSON-RPC | ListTasks with context_id filter |
| 49 | file-parts | JSON-RPC | Part::file_bytes with base64 content |
| 50 | history-length | JSON-RPC | history_length configuration via builder |
Tests 51-58: WebSocket, Multi-Tenancy & gRPC (transport.rs)
| # | Test | Transport | What it exercises |
|---|---|---|---|
| 51 | ws-send-message | WebSocket | JSON-RPC SendMessage over WebSocket (feature-gated) |
| 52 | ws-streaming | WebSocket | SendStreamingMessage with multi-frame streaming (feature-gated) |
| 53 | tenant-isolation | JSON-RPC | Different tenants cannot see each other's tasks |
| 54 | tenant-id-independence | Direct store | Same task ID in different tenants doesn't collide |
| 55 | tenant-count | Direct store | TenantAwareInMemoryTaskStore::tenant_count() tracking |
| 56 | grpc-send-message | gRPC | JSON-RPC SendMessage over gRPC transport (feature-gated) |
| 57 | grpc-streaming | gRPC | SendStreamingMessage over gRPC transport (feature-gated) |
| 58 | grpc-get-task | gRPC | GetTask after SendMessage over gRPC (feature-gated) |
Note: Tests 51-52 require the
websocketfeature flag:cargo run -p agent-team --features websocketTests 56-58 require thegrpcfeature flag:cargo run -p agent-team --features grpc
Tests 61-79: E2E Coverage Gaps (coverage_gaps.rs)
| # | Test | Category | What it exercises |
|---|---|---|---|
| 61 | batch-single-element | Batch JSON-RPC | Single-element batch [{...}] with SendMessage |
| 62 | batch-multi-request | Batch JSON-RPC | Multi-request batch: SendMessage + GetTask |
| 63 | batch-empty | Batch JSON-RPC | Empty batch [] returns parse error |
| 64 | batch-mixed | Batch JSON-RPC | Mixed valid/invalid requests in batch |
| 65 | batch-streaming-rejected | Batch JSON-RPC | SendStreamingMessage in batch returns error |
| 66 | batch-subscribe-rejected | Batch JSON-RPC | SubscribeToTask in batch returns error |
| 67 | real-auth-rejection | Auth | Interceptor rejects unauthenticated requests |
| 68 | extended-agent-card | Cards | GetExtendedAgentCard via JSON-RPC |
| 69 | dynamic-agent-card | Cards | DynamicAgentCardHandler runtime card generation |
| 70 | agent-card-caching | Caching | ETag, If-None-Match, 304 Not Modified |
| 71 | backpressure-lagged | Streaming | Slow reader skips lagged events (capacity=2) |
| 72 | push-global-limit | Push config | Global push config limit enforcement (DoS prevention) |
| 73 | webhook-url-scheme | Push config | Rejects non-HTTP webhook URL schemes (ftp://, file://) |
| 74 | combined-filter | ListTasks | Combined status + context_id filtering |
| 75 | latency-metrics | Metrics | Verifies on_request() callback fires |
| 76 | timeout-retryable | Retry | Timeout errors are classified as retryable |
| 77 | concurrent-cancels | Stress | 10 parallel cancel requests on same task |
| 78 | stale-page-token | Pagination | Graceful handling of invalid page tokens |
| 79 | agent-card-signing | Signing | ES256 key generation, JWS sign/verify, tamper detection (signing feature) |
Coverage by SDK Feature
| SDK Feature | Tests exercising it |
|---|---|
AgentExecutor trait | 1-5, 9-14, 22, 31-36, 38, 40, 46, 49, 51-52 |
| JSON-RPC dispatch | 1-2, 6-7, 9, 13, 15-16, 18-19, 23-24, 27, 29-34, 37-43, 45-46, 48-50, 53 |
| REST dispatch | 3-5, 8, 10-12, 14, 17, 20-22, 25-26, 28, 32, 35-36, 44, 47 |
| WebSocket dispatch | 51, 52 |
| SSE streaming | 2, 4, 14, 28, 35-36, 40, 44, 45, 47 |
| WebSocket streaming | 52 |
GetTask | 6, 14, 25, 35 |
ListTasks + pagination | 7, 16, 26, 37, 48, 50 |
CancelTask | 14, 20, 21 |
| Push config CRUD | 8, 19, 27, 36, 43 |
| Agent card discovery | 17, 18, 41, 42 |
ServerInterceptor | All tests (audit interceptor on every agent) |
Metrics hooks | 29, 30, 39 |
return_immediately | 22 |
CancellationToken | 14 |
| Error handling | 15, 19, 20, 21, 24, 30 |
| Multi-agent orchestration | 10, 11, 12 |
| Concurrent requests | 23, 31, 32, 47 |
SubscribeToTask resubscribe | 28, 45 |
| Multiple artifacts | 46 |
| File parts (binary) | 49 |
| History length config | 50 |
| Context ID filtering | 33, 48 |
| Multi-tenancy | 53, 54, 55 |
TenantAwareInMemoryTaskStore | 53, 54, 55 |
TenantContext::scope | 54, 55 |
| Batch JSON-RPC | 61, 62, 63, 64, 65, 66 |
| Auth rejection (interceptor) | 67 |
GetExtendedAgentCard | 68 |
DynamicAgentCardHandler | 69 |
| Agent card HTTP caching (ETag/304) | 70 |
| Agent card signing (JWS/ES256) | 79 |
Backpressure / Lagged events | 71 |
| Push config global limit | 72 |
| Webhook URL scheme validation | 73 |
| Combined status+context filter | 74 |
Metrics callbacks | 29, 30, 39, 75 |
| Timeout retryability (Bug #32) | 76 |
| Concurrent cancel stress | 77 |
| Stale page token handling | 78 |
| Agent card signing (JWS/ES256) | 79 |
Dedicated Integration Tests (Outside Agent-Team)
In addition to the 72 agent-team E2E tests (79 with optional transports and signing), the SDK includes dedicated integration test suites:
| Suite | Location | Tests | What it covers |
|---|---|---|---|
| TLS/mTLS | crates/a2a-client/tests/tls_integration_tests.rs | 7 | Client cert validation, SNI hostname verification, unknown CA rejection, mutual TLS |
| WebSocket server | crates/a2a-server/tests/websocket_tests.rs | 7 | Send/stream, error handling, ping/pong, connection reuse, close frames |
| Memory & load stress | crates/a2a-server/tests/stress_tests.rs | 5 | 200 concurrent requests, sustained load (500 requests/10 waves), eviction under load, multi-tenant isolation (10×50), rapid connect/disconnect |
Features NOT Covered by E2E Tests
All previously uncovered features now have E2E test coverage. Agent card signing is covered by test 79 (#[cfg(feature = "signing")]).
Production Hardening
A checklist and guide for deploying a2a-rust agents in production.
Security
HTTPS
Always use HTTPS in production. a2a-rust doesn't bundle TLS — terminate TLS at your reverse proxy (nginx, Caddy, cloud load balancer) or use a TLS library:
Client ──HTTPS──→ [nginx/Caddy] ──HTTP──→ [a2a-rust agent]
CORS
Configure CORS for browser-based clients:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::CorsConfig; // The dispatchers include CORS handling. // Configure allowed origins for production. }
Push Notification Security
The built-in HttpPushSender includes:
- SSRF protection — Rejects private/loopback IPs after DNS resolution
- Header injection prevention — Validates credentials for
\r/\ncharacters - HTTPS enforcement — Optionally require HTTPS webhook URLs
Path Traversal Protection
The REST dispatcher automatically rejects:
..in path segments- Percent-encoded
%2E%2Eand%2e%2e - Paths that escape the expected route hierarchy
Body Size Limits
| Limit | Value | Transport |
|---|---|---|
| Request body | 4 MiB | REST |
| Query string | 4 KiB | REST |
| Event size | 16 MiB (configurable) | SSE streaming |
Reliability
Executor Timeout
Prevent hung tasks from consuming resources forever:
#![allow(unused)] fn main() { use std::time::Duration; RequestHandlerBuilder::new(executor) .with_executor_timeout(Duration::from_secs(300)) .build() }
Concurrent Stream Limits
Bound memory usage from SSE connections:
#![allow(unused)] fn main() { RequestHandlerBuilder::new(executor) .with_max_concurrent_streams(1000) .build() }
Task Store Limits
Prevent unbounded memory growth:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::TaskStoreConfig; RequestHandlerBuilder::new(executor) .with_task_store_config(TaskStoreConfig { max_capacity: Some(100_000), task_ttl: Some(Duration::from_secs(3600)), eviction_interval: 64, max_page_size: 1000, }) .build() }
Use TaskStore::count() for monitoring capacity utilization.
Graceful Shutdown
Implement on_shutdown in your executor for cleanup:
#![allow(unused)] fn main() { fn on_shutdown<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { Box::pin(async move { self.db_pool.close().await; self.cancel_token.cancel(); }) } }
Rate Limiting
Protect public-facing agents from abuse:
#![allow(unused)] fn main() { use a2a_protocol_sdk::server::{RateLimitInterceptor, RateLimitConfig}; RequestHandlerBuilder::new(executor) .with_interceptor(RateLimitInterceptor::new(RateLimitConfig { requests_per_window: 100, window_secs: 60, })) .build() }
For advanced rate limiting (sliding windows, distributed counters), use a
reverse proxy or implement a custom ServerInterceptor.
Client Retry & Reuse
When calling remote agents, build clients once and reuse them:
#![allow(unused)] fn main() { // Build once at startup let client = ClientBuilder::new("http://agent.example.com") .with_retry_policy(RetryPolicy::default()) .build() .unwrap(); // Reuse across all requests — client holds a connection pool let result = client.send_message(params).await; }
Observability
Structured Logging
Enable the tracing feature for structured logs:
[dependencies]
a2a-protocol-server = { version = "0.2", features = ["tracing"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
#![allow(unused)] fn main() { use tracing_subscriber::EnvFilter; tracing_subscriber::fmt() .with_env_filter( EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("info")) ) .json() // JSON output for log aggregation .init(); }
Set RUST_LOG=debug for verbose output, RUST_LOG=a2a_protocol_server=debug for server-specific logs.
Health Checks
Add a health endpoint alongside your A2A dispatchers:
#![allow(unused)] fn main() { // Simple health check handler async fn health_check(req: hyper::Request<impl hyper::body::Body>) -> hyper::Response<Full<Bytes>> { if req.uri().path() == "/health" { hyper::Response::new(Full::new(Bytes::from("ok"))) } else { dispatcher.dispatch(req).await } } }
Performance
Connection Handling
Both dispatchers use hyper's HTTP/1.1 and HTTP/2 support via hyper_util::server::conn::auto::Builder. This automatically negotiates the best protocol version.
No Web Framework Overhead
a2a-rust works directly with hyper — no middleware framework overhead. Cross-cutting concerns (rate limiting, request logging, auth) are handled via the interceptor chain rather than a framework.
Event Queue Sizing
Tune the event queue for your workload:
#![allow(unused)] fn main() { // High-throughput: larger queues .with_event_queue_capacity(256) // Memory-constrained: smaller queues .with_event_queue_capacity(16) }
Deployment Checklist
- HTTPS termination configured
- CORS origins restricted to known clients
- Executor timeout set
- Max concurrent streams limited
- Task store TTL and capacity configured
- Rate limiting enabled for public-facing agents
- A2A clients built once and reused (not per-request)
- Structured logging enabled
- Health check endpoint available
- Push notification URLs restricted to HTTPS
- Body/query size limits verified
- Graceful shutdown implemented
Next Steps
- GitHub Pages & CI/CD — Continuous deployment
- Configuration Reference — All configuration options
GitHub Pages & CI/CD
a2a-rust uses GitHub Actions for continuous integration, crate publishing, and documentation deployment.
CI Pipeline
The CI workflow (.github/workflows/ci.yml) runs on every push and PR:
| Job | Description |
|---|---|
| Format | cargo fmt --check — enforces consistent formatting |
| Clippy | cargo clippy -- -D warnings — catches common mistakes |
| Test | cargo test --workspace — runs all tests |
| Deny | cargo deny check — audits dependencies for vulnerabilities |
| Doc | cargo doc --no-deps — verifies documentation builds |
| Mutants | cargo mutants --workspace — zero surviving mutants required |
The Mutation Testing workflow (.github/workflows/mutants.yml) runs separately:
| Mode | Trigger | Scope |
|---|---|---|
| Full sweep | Nightly (03:00 UTC) + manual | All library crates |
| Incremental | Pull requests to main | Changed .rs files only |
The full sweep produces a mutation report artifact with caught/missed/unviable counts and a mutation score. Zero missed mutants is required — any surviving mutant fails the build.
All actions are SHA-pinned for supply chain security:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Release Pipeline
The release workflow (.github/workflows/release.yml) triggers on version tags:
v0.2.0 tag → validate → ci + security → package → publish → github-release
Crates are published in dependency order:
a2a-protocol-types(no internal deps)a2a-protocol-client+a2a-protocol-server(depend on types)a2a-protocol-sdk(depends on all three)
Documentation Deployment
The docs workflow builds the mdBook and deploys to GitHub Pages:
# .github/workflows/docs.yml
name: Deploy Documentation
on:
push:
branches: [main]
workflow_dispatch:
permissions:
contents: read
pages: write
id-token: write
concurrency:
group: "pages"
cancel-in-progress: false
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- name: Install mdBook
run: |
mkdir -p $HOME/.local/bin
curl -sSL https://github.com/rust-lang/mdBook/releases/download/v0.4.40/mdbook-v0.4.40-x86_64-unknown-linux-gnu.tar.gz | tar -xz -C $HOME/.local/bin
echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Build book
run: mdbook build book
- uses: actions/configure-pages@v5
- uses: actions/upload-pages-artifact@v3
with:
path: book/book
deploy:
needs: build
runs-on: ubuntu-latest
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
steps:
- id: deployment
uses: actions/deploy-pages@v4
Setting Up GitHub Pages
- Go to Settings → Pages in your GitHub repo
- Set Source to "GitHub Actions"
- Push to
mainto trigger the first deployment - Your docs will be live at
https://tomtom215.github.io/a2a-rust/
Building Locally
# Install mdBook
cargo install mdbook
# Build
mdbook build book
# Serve with hot reload
mdbook serve book --open
Cargo Documentation
Rust API docs are generated separately:
# Build API docs for all crates
cargo doc --workspace --no-deps --open
Consider deploying these alongside the book, or linking to docs.rs once published.
Next Steps
- Configuration Reference — All configuration options
- Changelog — Version history
Pitfalls & Lessons Learned
A catalog of non-obvious problems encountered during development. Each entry documents a real issue and its solution.
Serde Pitfalls
Untagged enums hide inner errors
#[serde(untagged)] on SendMessageResponse and StreamResponse swallows the real deserialization error and replaces it with a generic "data did not match any variant" message.
Workaround: When debugging, temporarily switch to an externally tagged enum to see the real error. In production, log the raw JSON before attempting deserialization.
#[serde(default)] vs Option<T>
Using #[serde(default)] on a Vec<T> field means the field is present but empty when omitted from JSON. Using Option<Vec<T>> means it is absent (None).
The A2A spec distinguishes between "omitted" and "empty array" for fields like history and artifacts, so Option<Vec<T>> is the correct choice.
#![allow(unused)] fn main() { // Wrong: field is always present (empty vec if omitted) #[serde(default)] pub history: Vec<Message>, // Correct: field is absent when not provided #[serde(skip_serializing_if = "Option::is_none")] pub history: Option<Vec<Message>>, }
#[non_exhaustive] on enums breaks downstream matches
Adding #[non_exhaustive] to TaskState, ErrorCode, etc. forces downstream crates to include a wildcard arm. This is intentional for forward compatibility:
#![allow(unused)] fn main() { match task.status.state { TaskState::Completed => { /* ... */ } TaskState::Failed => { /* ... */ } _ => { /* Handle future states */ } } }
Hyper 1.x Pitfalls
Body is not Clone
Hyper 1.x Incoming body is consumed on read. You cannot read the body twice. Buffer it first:
#![allow(unused)] fn main() { use http_body_util::BodyExt; let bytes = req.into_body().collect().await?.to_bytes(); // Now work from `bytes` (can be cloned, read multiple times) }
Response builder panics on invalid header values
hyper::Response::builder().header(k, v) silently stores the error and panics when .body() is called.
Solution: Always use unwrap_or_else with a fallback response, or validate header values with .parse::<HeaderValue>() first. a2a-rust uses unwrap_or_else(|_| fallback_error_response()) in production paths.
size_hint() upper bound may be None
hyper::body::Body::size_hint().upper() returns None when Content-Length is absent. Always check for None before comparing against the body size limit:
#![allow(unused)] fn main() { if let Some(upper) = body.size_hint().upper() { if upper > MAX_BODY_SIZE { return Err(/* payload too large */); } } }
SSE Streaming Pitfalls
SSE parser must handle partial lines
SSE events may arrive split across TCP frames. The parser must buffer partial lines and only process complete \n-terminated lines. The EventBuffer in a2a-protocol-client handles this correctly, but naive lines() iterators will break on partial frames.
Memory limit on buffered SSE data
A malicious server can send an infinite SSE event (no \n\n terminator). The client parser enforces a 16 MiB cap on buffered event data to prevent OOM.
Push Notification Pitfalls
SSRF via webhook URLs
Push notification url fields can point to internal services (e.g., http://169.254.169.254/ — the cloud metadata endpoint).
Solution: The HttpPushSender resolves the URL and rejects private/loopback IP addresses after DNS resolution, not on the URL string alone. This prevents DNS rebinding attacks.
Header injection via credentials
Push notification credentials can contain newlines that inject additional HTTP headers.
Solution: The push sender validates that credential values contain no \r or \n characters before using them in HTTP headers.
Async / Tokio Pitfalls
Object-safe async traits need Pin<Box<dyn Future>>
Rust does not yet support async fn in traits that are used as dyn Trait. The TaskStore, AgentExecutor, and PushSender traits use explicit Pin<Box<dyn Future<Output = ...> + Send + 'a>> return types:
#![allow(unused)] fn main() { // The pattern for all object-safe async trait methods fn my_method<'a>( &'a self, args: &'a Args, ) -> Pin<Box<dyn Future<Output = Result<T>> + Send + 'a>> { Box::pin(async move { // async code here Ok(result) }) } }
Cancellation token cleanup
CancellationTokens that are never cancelled accumulate in the token map. The handler cleans up already-cancelled tokens before inserting new ones, and enforces a hard cap of 10,000 tokens.
Amortized eviction vs test expectations
Running O(n) eviction on every save() call is expensive. The task store amortizes eviction to every 64 writes. Tests that depend on eviction must call run_eviction() explicitly or account for the amortization interval.
std::sync::RwLock poisoning — fail-fast vs silent ignore
When a thread panics while holding a std::sync::RwLock, the lock becomes "poisoned." There are two strategies:
- Silent ignore (
lock.read().ok()?) — returnsNone/no-op on poisoned locks. This hides bugs. - Fail-fast (
lock.read().expect("poisoned")) — panics immediately, surfacing the problem.
a2a-rust uses fail-fast. If you see a "lock poisoned" panic, the root cause is a prior panic in another thread. Fix that panic first.
Rate limiter atomics under read lock — CAS loop required
When multiple threads share a rate limiter bucket under a read lock, non-atomic multi-step operations (load window → check → store count) create TOCTOU races. Two threads can both see an old window and both reset the counter.
Solution: Use compare_exchange (CAS) to atomically swap the window number. Only one thread wins the CAS; others loop and retry with the updated state.
Transport Pitfalls
Query string parameters must be URL-encoded
When building REST transport query strings from JSON values, parameter values must be percent-encoded per RFC 3986. A value like status=active&role=admin without encoding becomes three separate query parameters instead of one.
Solution: The build_query_string() function in the REST transport encodes all non-unreserved characters (A-Z a-z 0-9 - . _ ~).
WebSocket stream termination must use structured parsing
Detecting stream completion by checking text.contains("stream_complete") is fragile — it false-positives on any payload text containing that substring, and misses terminal status updates that don't contain that exact string.
Solution: Deserialize the JSON-RPC frame and check the result object for terminal task states (completed, failed, canceled, rejected) or the stream_complete sentinel.
Pre-bind listeners for all server types when building agent cards
The gRPC dispatcher had the same placeholder-URL bug as the HTTP dispatchers: the agent card was built before the server bound to a port, so the card contained "http://placeholder". This applied to any transport that binds its own port.
Solution: Pre-bind a TcpListener, extract the address, build the handler with the correct URL, then pass the listener via serve_with_listener() (gRPC) or the accept loop (HTTP).
Background tasks cannot propagate errors — log them
In a tokio::spawn-ed task, errors have no caller to return to. Using let _ = store.save(...).await silently drops failures, causing task state to diverge between the event queue and the persistent store.
Solution: Use if let Err(e) = store.save(...).await { trace_error!(...) } so failures are observable via logs and metrics.
Push Config Store Pitfalls
Per-resource limits are not enough — add global limits
A per-task push config limit (e.g., 100 configs per task) does not prevent an attacker from creating millions of tasks with 100 configs each. Always pair per-resource limits with a global cap (e.g., max_total_configs = 100_000). The global check must run before the per-resource check in the write path.
Webhook URL scheme validation
Checking for private IPs and hostnames in webhook URLs is insufficient. URLs like ftp://evil.com/hook or file:///etc/passwd bypass IP-based SSRF checks entirely. Always validate that the URL scheme is http or https before performing any further validation.
Performance Pitfalls
Vec<u8> vs Bytes in retry loops
Cloning a Vec<u8> inside a retry loop allocates a full heap copy each time. Use bytes::Bytes (reference-counted) so that .clone() is just an atomic reference count increment. This matters for push notification delivery where large payloads may be retried 3+ times.
Graceful Shutdown Pitfalls
Bound executor cleanup with a timeout
executor.on_shutdown().await can hang indefinitely if the executor's cleanup routine blocks. Always wrap with tokio::time::timeout() — the default is 10 seconds. Users can override via shutdown_with_timeout(duration).
gRPC Pitfalls
Map all tonic status codes, not just the common ones
Only mapping 4 gRPC status codes to A2A error codes loses semantic information for Unauthenticated, PermissionDenied, ResourceExhausted, etc. Map all relevant codes explicitly so clients can distinguish between error categories.
Feature-gated code paths need their own dogfood pass
The gRPC path behind #[cfg(feature = "grpc")] had the exact same placeholder URL bug that was already fixed for HTTP (Bug #12 → Bug #18). Feature-gated code is easy to miss during reviews. Always re-verify known bug patterns across all feature gates.
Cancel / State Machine Pitfalls
Check terminal state before emitting cancel
If an executor completes between the handler's cancel check and the cancel signal arrival, unconditionally emitting TaskState::Canceled causes an invalid Completed → Canceled state transition. Guard cancel emission with cancellation_token.is_cancelled() or a terminal-state check.
Re-check cancellation between multi-phase work
If an executor performs multiple phases (e.g., two artifact emissions), check cancellation between phases. Otherwise, cancellation between phases is delayed until the next natural check point.
Server Accept Loop Pitfalls
break vs continue on transient accept errors
Using Err(_) => break in a TCP accept loop kills the entire server on a single transient error (e.g., EMFILE when the file descriptor limit is reached). Use Err(_) => continue to skip the bad connection and keep serving. Log the error for observability but do not let it take down the server.
Workspace / Cargo Pitfalls
cargo-fuzz needs its own workspace
The fuzz/ directory contains its own Cargo.toml with [workspace] to prevent cargo-fuzz from conflicting with the main workspace. The fuzz crate references a2a-protocol-types via a relative path dependency.
Feature unification across workspace
Enabling a feature in one crate (e.g., signing in a2a-protocol-types) enables it for all crates in the workspace during cargo test --workspace. Use --no-default-features or per-crate test commands when testing feature gates.
Testing Pitfalls
HashMap iteration order is non-deterministic
The InMemoryTaskStore uses a HashMap internally. list() must sort results by task ID before applying pagination; otherwise, page boundaries shift between runs and tests become flaky.
Percent-encoded path traversal
Checking path.contains("..") is insufficient. Attackers can use %2E%2E (or mixed-case %2e%2E) to bypass the check.
Solution: The REST dispatcher percent-decodes the path before checking for .. sequences.
Next Steps
- Architecture Decision Records — Design decisions behind these choices
- Configuration Reference — All tunable parameters
Architecture Decision Records
Key design decisions for a2a-rust, documented as ADRs. Each record captures the context, decision, and rationale.
ADR 0001: Workspace Crate Structure
Status: Accepted
Context: The A2A protocol has three distinct concerns with different dependency trees: types (pure data, no I/O), client (HTTP sending), and server (HTTP receiving). A single-crate approach forces every user to compile the full dependency tree regardless of need.
Decision: Four crates in a Cargo workspace:
| Crate | Purpose | Key Dependencies |
|---|---|---|
a2a-protocol-types | Wire types only | serde, serde_json |
a2a-protocol-client | HTTP client | hyper, tokio |
a2a-protocol-server | Server framework | hyper, tokio |
a2a-protocol-sdk | Umbrella re-exports | All above |
Rationale: An agent server implementor doesn't pay for the client dependency tree and vice versa. The types crate is usable without any async runtime — useful for codegen, validation, or non-HTTP transports.
ADR 0002: Dependency Philosophy
Status: Accepted
Context: Rust SDK quality is inversely correlated with transitive dependency count. Each dependency adds compile time, supply chain attack surface, version conflicts, and potential license issues.
Decision: Minimal dependency footprint:
- No web frameworks (axum, actix, warp)
- No TLS bundled (bring your own or use a proxy)
- No logging framework forced (optional
tracingfeature) - In-tree SSE parser instead of third-party crate
serde+hyperas the only heavyweight deps
Rationale: A production-grade SDK must work in corporate environments with strict dependency audits, embedded/constrained environments, and without forcing users into specific TLS or logging frameworks.
ADR 0003: Async Runtime Strategy
Status: Accepted
Context: Rust async code is runtime-agnostic at the language level, but hyper 1.x uses tokio internally. Making the SDK runtime-agnostic would require wrapping every I/O call behind an abstraction layer.
Decision: Tokio is the mandatory async runtime. It is a required dependency (not optional, not feature-gated) for the I/O crates. a2a-protocol-types has no async runtime dependency.
Rationale: >95% of Rust async production code runs on tokio. The complexity cost of runtime abstraction is not justified by the ~5% of users on other runtimes.
ADR 0004: Transport Abstraction Design
Status: Accepted
Context: A2A defines three transport bindings (JSON-RPC, REST, gRPC). Both client and server share protocol logic that must not be duplicated across transport implementations.
Decision: Three-layer architecture:
Dispatcher (transport) → RequestHandler (protocol) → AgentExecutor (user logic)
- Dispatchers handle HTTP-level concerns (routing, content types, CORS)
- RequestHandler contains all protocol logic (task lifecycle, stores, streaming)
- AgentExecutor is the user's entry point
The Transport trait (client) and Dispatcher trait (server) make transports pluggable. Both share the same handler/client core.
Rationale: Adding gRPC support later requires only a new dispatcher/transport — zero changes to protocol logic or user code.
ADR 0005: SSE Streaming Design
Status: Accepted
Context: A2A streaming uses Server-Sent Events (SSE). Rust lacks a battle-tested, zero-dep SSE library that is hyper 1.x native.
Decision: In-tree SSE implementation for both client parsing and server emission:
- Client parser: ~220 lines, handles partial TCP frames, enforces 16 MiB buffer cap
- Server emitter: ~200 lines, formats SSE
data:lines with proper\n\nterminators - Zero additional dependencies beyond hyper
Rationale: Third-party SSE crates either use older hyper versions, carry unnecessary dependencies, or lack proper backpressure. The in-tree implementation is small enough to audit, test, and maintain.
ADR 0006: Mutation Testing as a Required Quality Gate
Status: Accepted
Context: The test suite includes unit, integration, property, fuzz, and E2E dogfood tests — but none of these measure whether the tests actually detect real bugs. A test suite can achieve 100% line coverage with trivial assertions. At multi-data-center deployment scales, the bugs that escape traditional testing have the highest blast radius.
Decision: Adopt cargo-mutants as a mandatory quality gate with zero surviving mutants required across all library crates. CI runs a full nightly sweep and incremental PR checks on changed files. Configuration is centralized in mutants.toml.
Rationale: Mutation testing is the only technique that directly measures fault detection capability. It provides an objective, automated answer to "would this test suite catch a real bug at this location?" The compute cost is managed through incremental mode, nightly scheduling, and exclusion of unproductive targets.
Summary
| ADR | Key Decision |
|---|---|
| 0001 | Four-crate workspace (types, client, server, sdk) |
| 0002 | Minimal dependencies, no bundled framework |
| 0003 | Tokio as mandatory runtime |
| 0004 | Three-layer architecture (dispatcher → handler → executor) |
| 0005 | In-tree SSE parser/emitter, zero additional deps |
| 0006 | cargo-mutants as mandatory quality gate, zero surviving mutants |
The full ADR documents are in the docs/adr/ directory.
Next Steps
- Configuration Reference — All tunable parameters
- Pitfalls & Lessons Learned — Practical issues and solutions
Configuration Reference
Complete reference of all configuration options across a2a-rust crates.
Server Configuration
RequestHandlerBuilder
| Option | Type | Default | Description |
|---|---|---|---|
with_agent_card | AgentCard | None | Discovery card for /.well-known/agent.json |
with_task_store | impl TaskStore | InMemoryTaskStore | Custom task storage backend |
with_task_store_config | TaskStoreConfig | No limits | TTL and capacity for default store |
with_push_config_store | impl PushConfigStore | InMemoryPushConfigStore | Custom push config storage |
with_push_sender | impl PushSender | None | Webhook delivery implementation |
with_interceptor | impl ServerInterceptor | Empty chain | Server middleware |
with_executor_timeout | Duration | None | Max time for executor completion |
with_event_queue_capacity | usize | 64 | Bounded channel size per stream |
with_max_event_size | usize | 16 MiB | Max serialized SSE event size |
with_max_concurrent_streams | usize | Unbounded | Limit concurrent SSE connections |
with_event_queue_write_timeout | Duration | 5s | Write timeout for event queue sends |
with_metrics | impl Metrics | NoopMetrics | Metrics observer for handler activity |
with_handler_limits | HandlerLimits | See below | Configurable validation limits |
HandlerLimits
| Field | Type | Default | Description |
|---|---|---|---|
max_id_length | usize | 1,024 | Maximum task/context ID length |
max_metadata_size | usize | 1 MiB | Maximum serialized metadata size |
max_cancellation_tokens | usize | 10,000 | Cleanup sweep threshold |
max_token_age | Duration | 1 hour | Stale token eviction age |
push_delivery_timeout | Duration | 5s | Per-webhook delivery timeout |
TaskStoreConfig
| Field | Type | Default | Description |
|---|---|---|---|
max_capacity | Option<usize> | 10,000 | Maximum stored tasks; oldest terminal tasks evicted on overflow |
task_ttl | Option<Duration> | 1 hour | TTL for completed/failed tasks |
eviction_interval | u64 | 64 | Writes between automatic eviction sweeps |
max_page_size | u32 | 1,000 | Maximum tasks per page in list queries |
InMemoryPushConfigStore
| Constructor | Default | Description |
|---|---|---|
::new() | 100 | Default max push configs per task |
::with_max_configs_per_task(N) | — | Custom per-task push config limit |
DispatchConfig
Shared configuration for both JSON-RPC and REST dispatchers. Pass to
JsonRpcDispatcher::with_config() or RestDispatcher::with_config().
| Field | Type | Default | Description |
|---|---|---|---|
max_request_body_size | usize | 4 MiB | Larger bodies return 413 |
body_read_timeout | Duration | 30s | Slow loris protection |
max_query_string_length | usize | 4,096 | REST only; longer queries return 414 |
sse_keep_alive_interval | Duration | 30s | Periodic keep-alive comment interval for SSE streams |
sse_channel_capacity | usize | 64 | SSE response body channel buffer size |
GrpcConfig
Configuration for the gRPC dispatcher (requires grpc feature).
| Field | Type | Default | Description |
|---|---|---|---|
max_message_size | usize | 4 MiB | Maximum inbound/outbound message size |
concurrency_limit | usize | 256 | Max concurrent gRPC requests per connection |
stream_channel_capacity | usize | 64 | Bounded channel for streaming responses |
PushRetryPolicy
Configurable retry policy for HttpPushSender. Pass via
HttpPushSender::with_retry_policy().
| Field | Type | Default | Description |
|---|---|---|---|
max_attempts | usize | 3 | Maximum delivery attempts |
backoff | Vec<Duration> | [1s, 2s] | Backoff durations between retries |
RateLimitConfig
| Field | Type | Default | Description |
|---|---|---|---|
requests_per_window | u64 | 100 | Max requests per caller per window |
window_secs | u64 | 60 | Window duration in seconds |
Internal Limits
| Limit | Value | Description |
|---|---|---|
| Event queue type | broadcast | Fan-out to multiple subscribers; slow readers skip missed events |
| Rate limiter cleanup interval | 256 checks | Stale buckets (from departed callers) evicted every 256 check() calls |
| Rate limiter window CAS | Lock-free | Window transitions use compare_exchange to avoid TOCTOU races |
| Credential store poisoning | Fail-fast | InMemoryCredentialsStore panics on poisoned locks rather than returning None |
Client Configuration
ClientBuilder
| Option | Type | Default | Description |
|---|---|---|---|
with_protocol_binding | &str | Auto-detect | Transport: "JSONRPC", "REST", or "GRPC" |
with_timeout | Duration | 30s | Per-request timeout |
with_connection_timeout | Duration | 10s | TCP connection timeout |
with_stream_connect_timeout | Duration | 30s | SSE connect timeout |
with_retry_policy | RetryPolicy | None | Retry on transient errors |
with_accepted_output_modes | Vec<String> | ["text/plain", "application/json"] | MIME types accepted |
with_history_length | u32 | None | Messages in responses |
with_return_immediately | bool | false | Don't wait for completion |
with_interceptor | impl CallInterceptor | Empty chain | Client middleware |
GrpcTransportConfig
Configuration for the gRPC client transport (requires grpc feature).
| Field | Type | Default | Description |
|---|---|---|---|
timeout | Duration | 30s | Per-request timeout |
connect_timeout | Duration | 10s | Connection timeout |
max_message_size | usize | 4 MiB | Maximum message size |
stream_channel_capacity | usize | 64 | Streaming response buffer |
RetryPolicy
| Field | Type | Default | Description |
|---|---|---|---|
max_retries | u32 | 3 | Maximum retry attempts |
initial_backoff | Duration | 500ms | Backoff before first retry |
max_backoff | Duration | 30s | Caps exponential growth |
backoff_multiplier | f64 | 2.0 | Multiplier per retry |
SSE Parser Limits
| Limit | Value | Description |
|---|---|---|
| Buffer cap | 16 MiB | Max buffered SSE data (aligned with server) |
| Connect timeout | 30s (default) | Initial connection timeout |
HTTP Caching (Agent Card)
| Header | Default | Description |
|---|---|---|
Cache-Control | public, max-age=60 | Configurable max-age |
ETag | Auto-computed | Content hash |
Last-Modified | Auto-set | Timestamp of last change |
Feature Flags
a2a-protocol-server
| Feature | Default | Description |
|---|---|---|
signing | Off | Agent card signing |
tracing | Off | Structured logging via tracing crate |
sqlite | Off | SQLite-backed task and push config stores via sqlx |
websocket | Off | WebSocket transport via tokio-tungstenite |
grpc | Off | gRPC transport via tonic |
otel | Off | OpenTelemetry metrics via opentelemetry-otlp |
a2a-protocol-client
| Feature | Default | Description |
|---|---|---|
signing | Off | Agent card signing verification |
tracing | Off | Structured logging via tracing crate |
tls-rustls | Off | HTTPS via rustls (no OpenSSL dependency) |
websocket | Off | WebSocket transport via tokio-tungstenite |
grpc | Off | gRPC transport via tonic |
a2a-protocol-types
| Feature | Default | Description |
|---|---|---|
signing | Off | JWS/ES256 agent card signing (RFC 8785 canonicalization) |
a2a-protocol-sdk (umbrella)
| Feature | Default | Description |
|---|---|---|
signing | Off | Enables signing in all sub-crates |
tracing | Off | Enables tracing in client and server |
tls-rustls | Off | Enables tls-rustls in client |
grpc | Off | Enables grpc in client and server |
otel | Off | Enables otel in the server |
Environment Variables
| Variable | Description |
|---|---|
RUST_LOG | Log level filter (when tracing feature is enabled) |
Examples:
RUST_LOG=info # Info and above
RUST_LOG=debug # Debug and above
RUST_LOG=a2a_protocol_server=debug # Debug for server crate only
RUST_LOG=a2a_protocol_server=trace,a2a_protocol_client=debug # Per-crate levels
Next Steps
- API Quick Reference — All public types at a glance
- Pitfalls & Lessons Learned — Known issues and workarounds
API Quick Reference
A condensed overview of all public types, traits, and functions across the a2a-rust crates.
Wire Types (a2a-protocol-types)
Core Types
| Type | Description |
|---|---|
Task | Unit of work with ID, status, history, artifacts |
TaskId | Newtype wrapper for task identifiers |
TaskState | Enum: Submitted, Working, InputRequired, AuthRequired, Completed, Failed, Canceled, Rejected |
TaskStatus | State + optional message + timestamp |
TaskVersion | Monotonically increasing version number |
ContextId | Conversation context identifier |
Messages
| Type | Description |
|---|---|
Message | Structured payload with ID, role, parts |
MessageId | Newtype wrapper for message identifiers |
MessageRole | Enum: User, Agent |
Part | Content unit: text, file, or data |
PartContent | Enum: Text, File, Data |
Artifacts
| Type | Description |
|---|---|
Artifact | Result produced by an agent |
ArtifactId | Newtype wrapper for artifact identifiers |
Events
| Type | Description |
|---|---|
StreamResponse | Enum: Task, Message, StatusUpdate, ArtifactUpdate |
TaskStatusUpdateEvent | Status change notification |
TaskArtifactUpdateEvent | Artifact delivery notification |
Agent Card
| Type | Description |
|---|---|
AgentCard | Root discovery document |
AgentInterface | Transport endpoint descriptor |
AgentCapabilities | Capability flags (streaming, push, extended card) |
AgentSkill | Discrete agent capability |
AgentProvider | Organization info |
Parameters
| Type | Description |
|---|---|
MessageSendParams | SendMessage / SendStreamingMessage input |
SendMessageConfiguration | Output modes, history, push config |
TaskQueryParams | GetTask input |
ListTasksParams | ListTasks input with filters and pagination |
CancelTaskParams | CancelTask input |
TaskIdParams | SubscribeToTask input |
Push Notifications
| Type | Description |
|---|---|
TaskPushNotificationConfig | Webhook registration |
AuthenticationInfo | Webhook auth credentials |
Responses
| Type | Description |
|---|---|
SendMessageResponse | Enum: Task or Message |
TaskListResponse | Paginated task list |
Errors
| Type | Description |
|---|---|
A2aError | Protocol-level error |
ErrorCode | Standard error codes |
A2aResult<T> | Alias for Result<T, A2aError> |
JSON-RPC
| Type | Description |
|---|---|
JsonRpcRequest | JSON-RPC 2.0 request envelope |
JsonRpcError | JSON-RPC error object |
JsonRpcVersion | Version marker ("2.0") |
Client (a2a-protocol-client)
Core Types
| Type | Description |
|---|---|
A2aClient | Main client for calling remote agents |
ClientBuilder | Fluent builder for client configuration |
EventStream | Async SSE event stream |
RetryPolicy | Configurable retry with exponential backoff |
Client Methods
| Method | Returns | Description |
|---|---|---|
send_message(params) | SendMessageResponse | Synchronous send |
stream_message(params) | EventStream | Streaming send |
get_task(params) | Task | Retrieve task by ID |
list_tasks(params) | TaskListResponse | Query tasks |
cancel_task(id) | Task | Cancel a running task |
subscribe_to_task(id) | EventStream | Re-subscribe to task events |
set_push_config(config) | TaskPushNotificationConfig | Create push config |
get_push_config(task_id, id) | TaskPushNotificationConfig | Get push config |
list_push_configs(params) | ListPushConfigsResponse | List push configs |
delete_push_config(task_id, id) | () | Delete push config |
get_authenticated_extended_card(params) | AgentCard | Get extended card |
Interceptors
| Type | Description |
|---|---|
CallInterceptor | Request/response hook trait |
InterceptorChain | Ordered interceptor sequence |
Transport
| Type | Description |
|---|---|
Transport | Pluggable transport trait |
JsonRpcTransport | JSON-RPC 2.0 transport |
RestTransport | REST/HTTP transport |
Server (a2a-protocol-server)
Core Types
| Type | Description |
|---|---|
RequestHandler | Central protocol orchestrator |
RequestHandlerBuilder | Fluent builder for handler configuration |
RequestContext | Information about the incoming request |
Traits
| Trait | Description |
|---|---|
AgentExecutor | Agent logic entry point |
TaskStore | Task persistence backend |
PushConfigStore | Push config persistence |
PushSender | Webhook delivery |
ServerInterceptor | Server-side middleware |
AgentCardProducer | Dynamic agent card generation |
Dispatcher | HTTP dispatch trait (for serve()) |
Dispatchers
| Type | Description |
|---|---|
JsonRpcDispatcher | JSON-RPC 2.0 HTTP dispatcher (implements Dispatcher) |
RestDispatcher | RESTful HTTP dispatcher (implements Dispatcher) |
Server Startup
| Function | Description |
|---|---|
serve(addr, dispatcher) | Bind + accept loop (blocking) |
serve_with_addr(addr, dispatcher) | Bind + spawn, returns SocketAddr |
Built-in Implementations
| Type | Description |
|---|---|
InMemoryTaskStore | In-memory task store with TTL |
InMemoryPushConfigStore | In-memory push config store |
HttpPushSender | HTTP webhook delivery with SSRF protection |
StaticAgentCardHandler | Static agent card with HTTP caching |
DynamicAgentCardHandler | Dynamic agent card with producer |
Streaming
| Type | Description |
|---|---|
EventEmitter | Ergonomic event emission helper |
EventQueueWriter | Write events to a stream |
EventQueueReader | Read events from a stream |
EventQueueManager | Manages stream lifecycle |
InMemoryQueueWriter | Bounded channel writer |
InMemoryQueueReader | Bounded channel reader |
Configuration
| Type | Description |
|---|---|
CorsConfig | Cross-origin policy |
TaskStoreConfig | TTL and capacity for in-memory store |
ServerError | Server-level error type |
ServerResult<T> | Alias for Result<T, ServerError> |
SDK (a2a-protocol-sdk)
Modules
| Module | Re-exports |
|---|---|
a2a_protocol_sdk::types | All a2a-protocol-types exports |
a2a_protocol_sdk::client | All a2a-protocol-client exports |
a2a_protocol_sdk::server | All a2a-protocol-server exports |
a2a_protocol_sdk::prelude | Most commonly used types |
Prelude Contents
The prelude includes the most commonly used types from all three crates — see Project Structure for the full list.
Constructors Cheatsheet
#![allow(unused)] fn main() { // Task status TaskStatus::new(TaskState::Working) TaskStatus::with_timestamp(TaskState::Completed) // Messages and parts Part::text("hello") Part::file_bytes(base64_string) Part::file_uri("https://...") Part::data(json_value) // Artifacts Artifact::new("artifact-id", vec![Part::text("content")]) // IDs TaskId::new("task-123") ContextId::new("ctx-456") MessageId::new("msg-789") // Capabilities (non_exhaustive — use builder) AgentCapabilities::none() .with_streaming(true) .with_push_notifications(false) // Push configs TaskPushNotificationConfig::new("task-id", "https://webhook.url") }
Next Steps
- Changelog — Version history
- Configuration Reference — All tunable parameters
Changelog
All notable changes to a2a-rust are documented in the project's CHANGELOG.md.
Versioning
a2a-rust follows Semantic Versioning:
- Major (1.0.0) — Breaking API changes
- Minor (0.2.0) — New features, backward compatible
- Patch (0.2.1) — Bug fixes, backward compatible
All four workspace crates share the same version number and are released together.
Release Process
Releases are triggered by pushing a version tag:
git tag v0.2.0
git push origin v0.2.0
The release workflow automatically:
- Validates all crate versions match the tag
- Runs the full CI suite
- Publishes crates to crates.io in dependency order
- Creates a GitHub release with notes
Publish Order
a2a-protocol-types → a2a-protocol-client + a2a-protocol-server → a2a-protocol-sdk
This ensures each crate's dependencies are available before it publishes.
Latest (Unreleased)
- Wave 2 inline unit tests — 110 new
#[cfg(test)]tests added directly to 9 criticala2a-protocol-serversource files (messaging, event_processing, push_config, lifecycle, handler/mod, REST/JSON-RPC/gRPC/WebSocket dispatchers). Total workspace test count: 1,255 passing tests.
Beyond-Spec Enhancements
- OpenTelemetry metrics (
otelfeature) —OtelMetricswith native OTLP export - Connection pool metrics —
ConnectionPoolStatsandon_connection_pool_statscallback - Hot-reload agent cards —
HotReloadAgentCardHandlerwith file polling and SIGHUP - Store migration tooling (
sqlitefeature) —MigrationRunnerwith V1–V3 built-in migrations - Per-tenant configuration —
PerTenantConfigandTenantLimitsfor differentiated service levels TenantResolvertrait —HeaderTenantResolver,BearerTokenTenantResolver,PathSegmentTenantResolver- Agent card signing E2E — test 79 in agent-team suite (
signingfeature)
Bug Fixes (Passes 7–8)
- Timeout errors now correctly classified as retryable (
ClientError::Timeout) - SSE parser O(n) dequeue replaced with
VecDequefor O(1)pop_front - Double-encoded path traversal bypass fixed with two-pass percent-decoding
- gRPC stream errors now preserve protocol error codes
- Rate limiter TOCTOU race fixed with CAS loop
- Push config store now enforces global limits (DoS prevention)
v0.2.0 (2026-03-15)
Initial implementation of A2A v1.0.0 with all 11 protocol methods, dual transport (JSON-RPC + REST), SSE streaming, push notifications, agent card discovery, HTTP caching, enterprise hardening, and 600+ tests (workspace total now 1,255 after subsequent waves).
For the complete version history, see CHANGELOG.md.