feat: Add 0.3 protocol version compatibility layer#805
feat: Add 0.3 protocol version compatibility layer#805kabir wants to merge 1 commit intoa2aproject:mainfrom
Conversation
This passes the 0.3 TCK, and all unit tests. At the moment 0.3 and 1.0 are NOT co-hosted, you need to select one
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive backward compatibility layer for A2A Protocol v0.3, enabling interoperability between v1.0 SDK components and v0.3 agents. The implementation includes dedicated spec types, a conversion layer using MapStruct, and transport handlers for JSON-RPC, gRPC, and REST. Key feedback includes the need to handle authentication errors in the JDK HTTP client's GET method, fixing misleading error messages in asynchronous requests, and enhancing the SSE parser's robustness. Furthermore, the REST transport requires consistent stream completion signaling, and lazy initialization of the agent card must be made thread-safe to prevent race conditions.
| .build(); | ||
| HttpResponse<String> response = | ||
| httpClient.send(request, BodyHandlers.ofString(StandardCharsets.UTF_8)); | ||
| return new JdkHttpResponse(response); | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
The get() method does not check for HTTP_UNAUTHORIZED (401) or HTTP_FORBIDDEN (403) status codes, unlike the post() method (lines 270-274). This inconsistency means authentication errors during GET requests (like fetching the agent card) might not be reported with the standard A2A error messages, violating the A2A protocol specification for error code mappings.
@Override
public A2AHttpResponse_v0_3 get() throws IOException, InterruptedException {
HttpRequest request = createRequestBuilder(false)
.build();
HttpResponse<String> response =
httpClient.send(request, BodyHandlers.ofString(StandardCharsets.UTF_8));
if (response.statusCode() == HTTP_UNAUTHORIZED) {
throw new IOException(A2AErrorMessages.AUTHENTICATION_FAILED);
} else if (response.statusCode() == HTTP_FORBIDDEN) {
throw new IOException(A2AErrorMessages.AUTHORIZATION_FAILED);
}
return new JdkHttpResponse(response);
}References
- Adhere to the A2A protocol specification for error code mappings, even if other mappings seem more internally consistent.
| response.statusCode() != HTTP_FORBIDDEN) { | ||
| subscriber.onError(new IOException("Request failed with status " + response.statusCode() + ":" + response.body())); | ||
| } | ||
| }) |
There was a problem hiding this comment.
| if (item != null && item.startsWith("data:")) { | ||
| item = item.substring(5).trim(); | ||
| if (!item.isEmpty()) { | ||
| messageConsumer.accept(item); | ||
| } | ||
| } |
There was a problem hiding this comment.
The SSE line parser is overly simplistic. It assumes every line starts with data: and ignores all other SSE fields (like event:, id:, or comments). More importantly, it doesn't handle multi-line data correctly, as it processes each line independently. While this might work for current A2A implementations, it's fragile and doesn't adhere to the SSE specification.
| () -> { | ||
| // We don't need to do anything special on completion | ||
| })); |
There was a problem hiding this comment.
The completeRunnable passed to postAsyncSSE is empty, and RestSSEEventListener_v0_3 lacks an onComplete method. This means the client will never receive a completion signal for the stream, which is inconsistent with the JSONRPCTransport_v0_3 implementation and may cause issues for users who need to know when a stream has finished.
| resolver = new A2ACardResolver_v0_3(httpClient, agentUrl, null, getHttpHeaders(context)); | ||
| agentCard = resolver.getAgentCard(); | ||
| needsExtendedCard = agentCard.supportsAuthenticatedExtendedCard(); | ||
| } | ||
| if (!needsExtendedCard) { |
| resolver = new A2ACardResolver_v0_3(httpClient, agentUrl, null, getHttpHeaders(context)); | ||
| agentCard = resolver.getAgentCard(); | ||
| needsExtendedCard = agentCard.supportsAuthenticatedExtendedCard(); | ||
| } | ||
| if (!needsExtendedCard) { |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive backward compatibility layer for A2A protocol version 0.3, enabling interoperability between v1.0 SDK components and v0.3 agents. The changes include a dedicated module structure for spec types, a server-side conversion layer using MapStruct, and transport-specific client and server implementations. Feedback highlights several critical issues: a missing checked exception declaration in the REST transport, incorrect resource name patterns and stubbed methods in the gRPC transport, and inconsistent field initialization. Additionally, a potential null pointer risk was identified in the SSE event listener's error handling logic.
| return payloadAndHeaders; | ||
| } | ||
|
|
||
| private String sendPostRequest(String url, PayloadAndHeaders_v0_3 payloadAndHeaders) throws IOException, InterruptedException, JsonProcessingException_v0_3 { |
There was a problem hiding this comment.
The sendPostRequest method throws A2AClientException_v0_3 (via RestErrorMapper_v0_3.mapRestError), but this exception is not declared in the throws clause. Assuming A2AClientException_v0_3 is a checked exception (as suggested by its declaration in interface methods), this will cause a compilation error.
| private String sendPostRequest(String url, PayloadAndHeaders_v0_3 payloadAndHeaders) throws IOException, InterruptedException, JsonProcessingException_v0_3 { | |
| private String sendPostRequest(String url, PayloadAndHeaders_v0_3 payloadAndHeaders) throws IOException, InterruptedException, JsonProcessingException_v0_3, A2AClientException_v0_3 { |
References
- Application-level errors should extend Exception or RuntimeException, not java.lang.Error.
| .setParent("tasks/" + request.id()) | ||
| .build(); | ||
| PayloadAndHeaders_v0_3 payloadAndHeaders = applyInterceptors(ListTaskPushNotificationConfigRequest_v0_3.METHOD, |
There was a problem hiding this comment.
The parent field for ListTaskPushNotificationConfigRequest is incorrectly set. It should be the name of the task resource (tasks/{taskId}), but it is currently being set to a path that includes the collection suffix and a leading slash. This will likely cause the gRPC call to fail on the server side.
ListTaskPushNotificationConfigRequest grpcRequest = ListTaskPushNotificationConfigRequest.newBuilder()
.setParent("tasks/" + request.id())
.build();| public RestTransport_v0_3(@Nullable A2AHttpClient_v0_3 httpClient, AgentCard_v0_3 agentCard, | ||
| String agentUrl, @Nullable List<ClientCallInterceptor_v0_3> interceptors) { | ||
| this.httpClient = httpClient == null ? new JdkA2AHttpClient_v0_3() : httpClient; | ||
| this.agentCard = agentCard; | ||
| this.agentUrl = agentUrl.endsWith("/") ? agentUrl.substring(0, agentUrl.length() - 1) : agentUrl; | ||
| this.interceptors = interceptors; | ||
| } |
There was a problem hiding this comment.
The needsExtendedCard field is not initialized in the constructor based on the provided agentCard. This differs from the implementation in JSONRPCTransport_v0_3 and will result in getAgentCard never attempting to fetch the authenticated extended card if the transport was initialized with an existing card.
| public RestTransport_v0_3(@Nullable A2AHttpClient_v0_3 httpClient, AgentCard_v0_3 agentCard, | |
| String agentUrl, @Nullable List<ClientCallInterceptor_v0_3> interceptors) { | |
| this.httpClient = httpClient == null ? new JdkA2AHttpClient_v0_3() : httpClient; | |
| this.agentCard = agentCard; | |
| this.agentUrl = agentUrl.endsWith("/") ? agentUrl.substring(0, agentUrl.length() - 1) : agentUrl; | |
| this.interceptors = interceptors; | |
| } | |
| public RestTransport_v0_3(@Nullable A2AHttpClient_v0_3 httpClient, AgentCard_v0_3 agentCard, | |
| String agentUrl, @Nullable List<ClientCallInterceptor_v0_3> interceptors) { | |
| this.httpClient = httpClient == null ? new JdkA2AHttpClient_v0_3() : httpClient; | |
| this.agentCard = agentCard; | |
| this.agentUrl = agentUrl.endsWith("/") ? agentUrl.substring(0, agentUrl.length() - 1) : agentUrl; | |
| this.interceptors = interceptors; | |
| this.needsExtendedCard = agentCard == null || agentCard.supportsAuthenticatedExtendedCard(); | |
| } |
| public AgentCard_v0_3 getAgentCard(@Nullable ClientCallContext_v0_3 context) throws A2AClientException_v0_3 { | ||
| // TODO: Determine how to handle retrieving the authenticated extended agent card | ||
| return agentCard; | ||
| } |
There was a problem hiding this comment.
The getAgentCard method is currently a stub that just returns the cached agentCard. Unlike the JSON-RPC and REST transports, it does not attempt to fetch the card from the server or handle the authenticated extended card logic. It should be implemented using the gRPC GetAgentCard method to ensure consistency across transports.
| name.append("tasks/"); | ||
| name.append(taskId); | ||
| if (pushNotificationConfigId != null) { | ||
| name.append("/pushNotificationConfigs/"); | ||
| name.append(pushNotificationConfigId); | ||
| } | ||
| //name.append("/pushNotificationConfigs/"); | ||
| // Use taskId as default config ID if none provided | ||
| //name.append(pushNotificationConfigId != null ? pushNotificationConfigId : taskId); | ||
| return name.toString(); | ||
| } | ||
|
|
There was a problem hiding this comment.
The getTaskPushNotificationConfigName method returns an incorrect resource name when pushNotificationConfigId is null. According to the A2A v0.3 specification, the resource name for a task's push notification configuration should follow the pattern tasks/{task_id}/pushNotificationConfigs/{config_id}. If no specific config ID is provided, it should typically default to the task ID. Currently, it returns tasks/{taskId}, which the server will fail to recognize as a configuration resource.
private String getTaskPushNotificationConfigName(String taskId, @Nullable String pushNotificationConfigId) {
StringBuilder name = new StringBuilder();
name.append("tasks/");
name.append(taskId);
name.append("/pushNotificationConfigs/");
name.append(pushNotificationConfigId != null ? pushNotificationConfigId : taskId);
return name.toString();
}| // Signal normal stream completion (null error means successful completion) | ||
| log.fine("SSEEventListener.onComplete() called - signaling successful stream completion"); | ||
| if (errorHandler != null) { | ||
| log.fine("Calling errorHandler.accept(null) to signal successful completion"); |
There was a problem hiding this comment.
Passing null to the errorHandler to signal completion is risky. If the consumer implementation of errorHandler (which is a Consumer<Throwable>) does not explicitly check for null, it will throw a NullPointerException. It is better to use a separate completion callback or ensure the error handler handles nulls.
This passes the 0.3 TCK, and all unit tests.
At the moment 0.3 and 1.0 are NOT co-hosted, you need to select one