diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py index ec2bdbc2..d3ecaa6f 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py @@ -19,13 +19,37 @@ unregister_span_enricher, ) from .exporters.spectra_exporter_options import SpectraExporterOptions -from .inference_call_details import InferenceCallDetails, ServiceEndpoint +from .inference_call_details import InferenceCallDetails +from .models.service_endpoint import ServiceEndpoint from .inference_operation_type import InferenceOperationType from .inference_scope import InferenceScope from .invoke_agent_details import InvokeAgentScopeDetails from .invoke_agent_scope import InvokeAgentScope from .middleware.baggage_builder import BaggageBuilder from .models.caller_details import CallerDetails +from .models.messages import ( + BlobPart, + ChatMessage, + FilePart, + FinishReason, + GenericPart, + InputMessages, + InputMessagesParam, + MessagePart, + MessageRole, + Modality, + OutputMessage, + OutputMessages, + OutputMessagesParam, + ReasoningPart, + ServerToolCallPart, + ServerToolCallResponsePart, + TextPart, + ToolCallRequestPart, + ToolCallResponsePart, + UriPart, +) +from .models.response import Response from .models.user_details import UserDetails from .opentelemetry_scope import OpenTelemetryScope from .request import Request @@ -70,12 +94,34 @@ "ToolCallDetails", "Channel", "Request", + "Response", "SpanDetails", "InferenceCallDetails", "ServiceEndpoint", # Enums "InferenceOperationType", "ToolType", + # OTEL gen-ai message format types + "MessageRole", + "FinishReason", + "Modality", + "TextPart", + "ToolCallRequestPart", + "ToolCallResponsePart", + "ReasoningPart", + "BlobPart", + "FilePart", + "UriPart", + "ServerToolCallPart", + "ServerToolCallResponsePart", + "GenericPart", + "MessagePart", + "ChatMessage", + "OutputMessage", + "InputMessages", + "OutputMessages", + "InputMessagesParam", + "OutputMessagesParam", # Utility functions "extract_context_from_headers", "get_traceparent", diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/execute_tool_scope.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/execute_tool_scope.py index ee314092..6680a310 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/execute_tool_scope.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/execute_tool_scope.py @@ -12,6 +12,7 @@ GEN_AI_CONVERSATION_ID_KEY, GEN_AI_TOOL_ARGS_KEY, GEN_AI_TOOL_CALL_ID_KEY, + GEN_AI_TOOL_CALL_RESULT_KEY, GEN_AI_TOOL_DESCRIPTION_KEY, GEN_AI_TOOL_NAME_KEY, GEN_AI_TOOL_TYPE_KEY, @@ -21,12 +22,12 @@ USER_ID_KEY, USER_NAME_KEY, ) +from .utils import safe_json_dumps, validate_and_normalize_ip from .models.user_details import UserDetails from .opentelemetry_scope import OpenTelemetryScope from .request import Request from .span_details import SpanDetails from .tool_call_details import ToolCallDetails -from .utils import validate_and_normalize_ip class ExecuteToolScope(OpenTelemetryScope): @@ -108,7 +109,9 @@ def __init__( endpoint = details.endpoint self.set_tag_maybe(GEN_AI_TOOL_NAME_KEY, tool_name) - self.set_tag_maybe(GEN_AI_TOOL_ARGS_KEY, arguments) + if arguments is not None: + serialized = safe_json_dumps(arguments) if isinstance(arguments, dict) else arguments + self.set_tag_maybe(GEN_AI_TOOL_ARGS_KEY, serialized) self.set_tag_maybe(GEN_AI_TOOL_TYPE_KEY, tool_type) self.set_tag_maybe(GEN_AI_TOOL_CALL_ID_KEY, tool_call_id) self.set_tag_maybe(GEN_AI_TOOL_DESCRIPTION_KEY, description) @@ -134,13 +137,15 @@ def __init__( validate_and_normalize_ip(user_details.user_client_ip), ) - def record_response(self, response: str) -> None: - """Records response information for telemetry tracking. + def record_response(self, result: dict[str, object] | str) -> None: + """Record the tool call result for telemetry tracking. - Note: This method is intentionally a no-op as GEN_AI_EVENT_CONTENT was removed. - The method is kept for interface compatibility. + Per OTEL spec, the result is expected to be an object. If a string + is provided, it is recorded as-is (JSON string fallback). If a dict + is provided, it is serialized to JSON. Args: - response: The response to record + result: Tool call result as a structured dict or JSON string """ - pass + serialized = safe_json_dumps(result) if isinstance(result, dict) else result + self.set_tag_maybe(GEN_AI_TOOL_CALL_RESULT_KEY, serialized) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/execution_type.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/execution_type.py deleted file mode 100644 index 59f5c15e..00000000 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/execution_type.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -# Execution type enum. - -from enum import Enum - - -class ExecutionType(Enum): - """Enumeration for different types of agent execution contexts.""" - - AGENT_TO_AGENT = "Agent2Agent" - EVENT_TO_AGENT = "EventToAgent" - HUMAN_TO_AGENT = "HumanToAgent" diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/inference_call_details.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/inference_call_details.py index a8af76c2..f2d8dff1 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/inference_call_details.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/inference_call_details.py @@ -4,17 +4,7 @@ from dataclasses import dataclass from .inference_operation_type import InferenceOperationType - - -@dataclass -class ServiceEndpoint: - """Represents a service endpoint with hostname and optional port.""" - - hostname: str - """The hostname of the service endpoint.""" - - port: int | None = None - """The port of the service endpoint.""" +from .models.service_endpoint import ServiceEndpoint @dataclass diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/inference_scope.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/inference_scope.py index 85833ce3..dd21dde0 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/inference_scope.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/inference_scope.py @@ -27,6 +27,12 @@ GEN_AI_CALLER_CLIENT_IP_KEY, ) from .inference_call_details import InferenceCallDetails +from .message_utils import ( + normalize_input_messages, + normalize_output_messages, + serialize_messages, +) +from .models.messages import InputMessagesParam, OutputMessagesParam from .models.user_details import UserDetails from .opentelemetry_scope import OpenTelemetryScope from .request import Request @@ -96,8 +102,8 @@ def __init__( span_details=resolved_span_details, ) - if request.content: - self.set_tag_maybe(GEN_AI_INPUT_MESSAGES_KEY, request.content) + if request.content is not None: + self.record_input_messages(request.content) self.set_tag_maybe(GEN_AI_CONVERSATION_ID_KEY, request.conversation_id) self.set_tag_maybe(GEN_AI_OPERATION_NAME_KEY, details.operationName.value) @@ -138,21 +144,29 @@ def __init__( validate_and_normalize_ip(user_details.user_client_ip), ) - def record_input_messages(self, messages: List[str]) -> None: + def record_input_messages(self, messages: InputMessagesParam) -> None: """Records the input messages for telemetry tracking. + Accepts plain strings (auto-wrapped as OTEL ChatMessage with role ``user``) + or a versioned ``InputMessages`` wrapper. + Args: - messages: List of input messages + messages: List of input message strings or an InputMessages wrapper """ - self.set_tag_maybe(GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps(messages)) + wrapper = normalize_input_messages(messages) + self.set_tag_maybe(GEN_AI_INPUT_MESSAGES_KEY, serialize_messages(wrapper)) - def record_output_messages(self, messages: List[str]) -> None: + def record_output_messages(self, messages: OutputMessagesParam) -> None: """Records the output messages for telemetry tracking. + Accepts plain strings (auto-wrapped as OTEL OutputMessage with role ``assistant``) + or a versioned ``OutputMessages`` wrapper. + Args: - messages: List of output messages + messages: List of output message strings or an OutputMessages wrapper """ - self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(messages)) + wrapper = normalize_output_messages(messages) + self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, serialize_messages(wrapper)) def record_input_tokens(self, input_tokens: int) -> None: """Records the number of input tokens for telemetry tracking. diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/invoke_agent_details.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/invoke_agent_details.py index 33f99540..94655a1f 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/invoke_agent_details.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/invoke_agent_details.py @@ -4,11 +4,12 @@ # Data class for invoke agent scope details. from dataclasses import dataclass -from urllib.parse import ParseResult + +from .models.service_endpoint import ServiceEndpoint @dataclass class InvokeAgentScopeDetails: """Scope-level configuration for agent invocation tracing.""" - endpoint: ParseResult | None = None + endpoint: ServiceEndpoint | None = None diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/invoke_agent_scope.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/invoke_agent_scope.py index eb0d9a56..8a7d91b0 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/invoke_agent_scope.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/invoke_agent_scope.py @@ -31,11 +31,17 @@ USER_NAME_KEY, ) from .invoke_agent_details import InvokeAgentScopeDetails +from .message_utils import ( + normalize_input_messages, + normalize_output_messages, + serialize_messages, +) from .models.caller_details import CallerDetails +from .models.messages import InputMessagesParam, OutputMessagesParam from .opentelemetry_scope import OpenTelemetryScope from .request import Request from .span_details import SpanDetails -from .utils import safe_json_dumps, validate_and_normalize_ip +from .utils import validate_and_normalize_ip logger = logging.getLogger(__name__) @@ -129,8 +135,8 @@ def __init__( if request.channel: self.set_tag_maybe(CHANNEL_NAME_KEY, request.channel.name) self.set_tag_maybe(CHANNEL_LINK_KEY, request.channel.link) - if request.content: - self.set_tag_maybe(GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps([request.content])) + if request.content is not None: + self.record_input_messages(request.content) # Set caller details tags if caller_details: @@ -176,18 +182,26 @@ def record_response(self, response: str) -> None: """ self.record_output_messages([response]) - def record_input_messages(self, messages: list[str]) -> None: + def record_input_messages(self, messages: InputMessagesParam) -> None: """Record the input messages for telemetry tracking. + Accepts plain strings (auto-wrapped as OTEL ChatMessage with role ``user``) + or a versioned ``InputMessages`` wrapper. + Args: - messages: List of input messages to record + messages: List of input message strings or an InputMessages wrapper """ - self.set_tag_maybe(GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps(messages)) + wrapper = normalize_input_messages(messages) + self.set_tag_maybe(GEN_AI_INPUT_MESSAGES_KEY, serialize_messages(wrapper)) - def record_output_messages(self, messages: list[str]) -> None: + def record_output_messages(self, messages: OutputMessagesParam) -> None: """Record the output messages for telemetry tracking. + Accepts plain strings (auto-wrapped as OTEL OutputMessage with role ``assistant``) + or a versioned ``OutputMessages`` wrapper. + Args: - messages: List of output messages to record + messages: List of output message strings or an OutputMessages wrapper """ - self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(messages)) + wrapper = normalize_output_messages(messages) + self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, serialize_messages(wrapper)) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/message_utils.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/message_utils.py new file mode 100644 index 00000000..cfc03ae0 --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/message_utils.py @@ -0,0 +1,150 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Conversion and serialization helpers for OTEL gen-ai message format. + +Provides normalization from plain ``list[str]`` (backward compat) to the +versioned wrapper format, and a non-throwing ``serialize_messages`` function. +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import asdict +from enum import Enum +from typing import Union + +from .models.messages import ( + A365_MESSAGE_SCHEMA_VERSION, + ChatMessage, + InputMessages, + InputMessagesParam, + MessageRole, + OutputMessage, + OutputMessages, + OutputMessagesParam, + TextPart, +) + +logger = logging.getLogger(__name__) + + +def is_string_list( + param: Union[InputMessagesParam, OutputMessagesParam], +) -> bool: + """Return ``True`` when *param* is a plain ``list[str]``.""" + return isinstance(param, list) and all(isinstance(item, str) for item in param) + + +def is_wrapped_messages( + param: Union[InputMessagesParam, OutputMessagesParam], +) -> bool: + """Return ``True`` when *param* is a versioned wrapper.""" + return isinstance(param, (InputMessages, OutputMessages)) + + +# --------------------------------------------------------------------------- +# Plain-string → structured conversion +# --------------------------------------------------------------------------- + + +def to_input_messages(messages: list[str]) -> list[ChatMessage]: + """Convert plain input strings into OTEL ``ChatMessage`` objects.""" + return [ + ChatMessage(role=MessageRole.USER, parts=[TextPart(content=content)]) + for content in messages + ] + + +def to_output_messages(messages: list[str]) -> list[OutputMessage]: + """Convert plain output strings into OTEL ``OutputMessage`` objects.""" + return [ + OutputMessage(role=MessageRole.ASSISTANT, parts=[TextPart(content=content)]) + for content in messages + ] + + +# --------------------------------------------------------------------------- +# Normalization (union → versioned wrapper) +# --------------------------------------------------------------------------- + + +def normalize_input_messages(param: InputMessagesParam) -> InputMessages: + """Normalize an ``InputMessagesParam`` to a versioned ``InputMessages`` wrapper. + + - ``str`` → wrapped in a single-element list, then converted. + - ``list[str]`` → converted to ``ChatMessage`` list and wrapped. + - ``InputMessages`` → returned as-is. + """ + if isinstance(param, str): + return InputMessages(messages=to_input_messages([param])) + if is_string_list(param): + return InputMessages(messages=to_input_messages(param)) # type: ignore[arg-type] + return param # type: ignore[return-value] + + +def normalize_output_messages(param: OutputMessagesParam) -> OutputMessages: + """Normalize an ``OutputMessagesParam`` to a versioned ``OutputMessages`` wrapper. + + - ``str`` → wrapped in a single-element list, then converted. + - ``list[str]`` → converted to ``OutputMessage`` list and wrapped. + - ``OutputMessages`` → returned as-is. + """ + if isinstance(param, str): + return OutputMessages(messages=to_output_messages([param])) + if is_string_list(param): + return OutputMessages(messages=to_output_messages(param)) # type: ignore[arg-type] + return param # type: ignore[return-value] + + +# --------------------------------------------------------------------------- +# Serialization +# --------------------------------------------------------------------------- + + +def _message_dict_factory(items: list[tuple[str, object]]) -> dict[str, object]: + """Custom dict factory for ``dataclasses.asdict``. + + Drops ``None`` values and converts enum members to their string value. + """ + return {k: (v.value if isinstance(v, Enum) else v) for k, v in items if v is not None} + + +def serialize_messages( + wrapper: Union[InputMessages, OutputMessages], +) -> str: + """Serialize a versioned message wrapper to JSON. + + The output is the full wrapper object: + ``{"version":"0.1.0","messages":[...]}``. + + The try/except ensures telemetry recording is non-throwing even when + message parts contain non-JSON-serializable values. + """ + try: + return json.dumps( + asdict(wrapper, dict_factory=_message_dict_factory), + default=str, + ensure_ascii=False, + ) + except Exception: + logger.warning("Failed to serialize messages; using fallback.", exc_info=True) + messages = getattr(wrapper, "messages", []) + count = len(messages) if isinstance(messages, list) else 0 + noun = "message" if count == 1 else "messages" + fallback = { + "version": A365_MESSAGE_SCHEMA_VERSION, + "messages": [ + { + "role": MessageRole.SYSTEM.value, + "parts": [ + { + "type": "text", + "content": f"[serialization failed: {count} {noun}]", + } + ], + } + ], + } + return json.dumps(fallback, ensure_ascii=False) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/models/messages.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/models/messages.py new file mode 100644 index 00000000..962f3932 --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/models/messages.py @@ -0,0 +1,221 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""OTEL gen-ai semantic convention message types. + +Defines the structured message format for input/output message tracing, +following the OpenTelemetry gen-ai semantic conventions: + https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-input-messages.json + https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-output-messages.json +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from typing import Union + +# --------------------------------------------------------------------------- +# Enums +# --------------------------------------------------------------------------- + + +class MessageRole(Enum): + """Role of a message participant per OTEL gen-ai semantic conventions.""" + + SYSTEM = "system" + USER = "user" + ASSISTANT = "assistant" + TOOL = "tool" + + +class FinishReason(Enum): + """Reason a model stopped generating per OTEL gen-ai semantic conventions.""" + + STOP = "stop" + LENGTH = "length" + CONTENT_FILTER = "content_filter" + TOOL_CALL = "tool_call" + ERROR = "error" + + +class Modality(Enum): + """Media modality for blob, file, and URI parts.""" + + IMAGE = "image" + VIDEO = "video" + AUDIO = "audio" + + +# --------------------------------------------------------------------------- +# Message part types (discriminated on ``type``) +# --------------------------------------------------------------------------- + + +@dataclass +class TextPart: + """Plain text content.""" + + content: str + type: str = field(default="text", init=False) + + +@dataclass +class ToolCallRequestPart: + """A tool call requested by the model.""" + + name: str + id: str | None = None + arguments: dict[str, object] | list[object] | None = None + type: str = field(default="tool_call", init=False) + + +@dataclass +class ToolCallResponsePart: + """Result of a tool call.""" + + id: str | None = None + response: object | None = None + type: str = field(default="tool_call_response", init=False) + + +@dataclass +class ReasoningPart: + """Model reasoning / chain-of-thought content.""" + + content: str + type: str = field(default="reasoning", init=False) + + +@dataclass +class BlobPart: + """Inline binary data (base64-encoded).""" + + modality: Modality | str + content: str + mime_type: str | None = None + type: str = field(default="blob", init=False) + + +@dataclass +class FilePart: + """Reference to a pre-uploaded file.""" + + modality: Modality | str + file_id: str + mime_type: str | None = None + type: str = field(default="file", init=False) + + +@dataclass +class UriPart: + """External URI reference.""" + + modality: Modality | str + uri: str + mime_type: str | None = None + type: str = field(default="uri", init=False) + + +@dataclass +class ServerToolCallPart: + """Server-side tool invocation.""" + + name: str + server_tool_call: dict[str, object] + id: str | None = None + type: str = field(default="server_tool_call", init=False) + + +@dataclass +class ServerToolCallResponsePart: + """Server-side tool response.""" + + server_tool_call_response: dict[str, object] + id: str | None = None + type: str = field(default="server_tool_call_response", init=False) + + +@dataclass +class GenericPart: + """Extensible part for custom / future types.""" + + type: str + data: dict[str, object] = field(default_factory=dict) + + +MessagePart = Union[ + TextPart, + ToolCallRequestPart, + ToolCallResponsePart, + ReasoningPart, + BlobPart, + FilePart, + UriPart, + ServerToolCallPart, + ServerToolCallResponsePart, + GenericPart, +] +"""Union of all message part types per OTEL gen-ai semantic conventions.""" + + +# --------------------------------------------------------------------------- +# Message types +# --------------------------------------------------------------------------- + + +@dataclass +class ChatMessage: + """An input message sent to a model (OTEL gen-ai semantic conventions).""" + + role: MessageRole + parts: list[MessagePart] = field(default_factory=list) + name: str | None = None + + +@dataclass +class OutputMessage(ChatMessage): + """An output message produced by a model (OTEL gen-ai semantic conventions).""" + + finish_reason: str | None = None + + +# --------------------------------------------------------------------------- +# Versioned wrappers +# --------------------------------------------------------------------------- + +A365_MESSAGE_SCHEMA_VERSION: str = "0.1.0" +"""Schema version embedded in serialized message payloads.""" + + +@dataclass +class InputMessages: + """Versioned wrapper for input messages.""" + + messages: list[ChatMessage] = field(default_factory=list) + version: str = field(default=A365_MESSAGE_SCHEMA_VERSION, init=False) + + +@dataclass +class OutputMessages: + """Versioned wrapper for output messages.""" + + messages: list[OutputMessage] = field(default_factory=list) + version: str = field(default=A365_MESSAGE_SCHEMA_VERSION, init=False) + + +# --------------------------------------------------------------------------- +# Parameter type aliases (backward-compatible union types) +# --------------------------------------------------------------------------- + +InputMessagesParam = Union[str, list[str], InputMessages] +"""Accepted input for ``record_input_messages``. + +Supports a single string, a list of strings (backward compat), or the versioned wrapper. +""" + +OutputMessagesParam = Union[str, list[str], OutputMessages] +"""Accepted input for ``record_output_messages``. + +Supports a single string, a list of strings (backward compat), or the versioned wrapper. +""" diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/models/response.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/models/response.py index 0e4ae9fd..137f49bd 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/models/response.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/models/response.py @@ -1,12 +1,28 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +from __future__ import annotations + from dataclasses import dataclass +from typing import Union + +from .messages import OutputMessagesParam + +ResponseMessagesParam = Union[OutputMessagesParam, dict[str, object]] +"""Accepted type for Response.messages. + +Supports plain strings, ``OutputMessages``, or a structured tool result dict. +A ``dict[str, object]`` is treated as a tool call result per OTEL spec and +serialized directly via ``json.dumps``. +""" @dataclass class Response: - """Response details from agent execution.""" + """Response details from agent execution. + + Accepts plain strings (backward compat), structured ``OutputMessages``, + or a ``dict`` for tool call results (per OTEL spec). + """ - """The list of response messages from the agent.""" - messages: list[str] + messages: ResponseMessagesParam diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/models/service_endpoint.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/models/service_endpoint.py new file mode 100644 index 00000000..5382ab6c --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/models/service_endpoint.py @@ -0,0 +1,15 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from dataclasses import dataclass + + +@dataclass +class ServiceEndpoint: + """Represents a service endpoint with hostname and optional port.""" + + hostname: str + """The hostname of the service endpoint.""" + + port: int | None = None + """The port of the service endpoint.""" diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/request.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/request.py index 1523fbab..b351ca5d 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/request.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/request.py @@ -3,16 +3,19 @@ # Request class. +from __future__ import annotations + from dataclasses import dataclass from .channel import Channel +from .models.messages import InputMessagesParam @dataclass class Request: """Request details for agent execution.""" - content: str | None = None + content: InputMessagesParam | None = None session_id: str | None = None channel: Channel | None = None conversation_id: str | None = None diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/output_scope.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/output_scope.py index 2913fe70..db837620 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/output_scope.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/output_scope.py @@ -12,7 +12,9 @@ USER_ID_KEY, USER_NAME_KEY, ) -from ..models.response import Response +from ..message_utils import normalize_output_messages, serialize_messages +from ..models.messages import OutputMessages +from ..models.response import Response, ResponseMessagesParam from ..models.user_details import UserDetails from ..opentelemetry_scope import OpenTelemetryScope from ..request import Request @@ -23,9 +25,13 @@ class OutputScope(OpenTelemetryScope): - """Provides OpenTelemetry tracing scope for output messages.""" + """Provides OpenTelemetry tracing scope for output messages. - _MAX_OUTPUT_MESSAGES = 5000 + Output messages are set once (via the constructor or ``record_output_messages``) + rather than accumulated. For streaming scenarios, the agent developer should + collect all output (e.g. via a list or string builder) and pass the final + result to ``OutputScope``. + """ @staticmethod def start( @@ -87,12 +93,7 @@ def __init__( ) self.set_tag_maybe(GEN_AI_CONVERSATION_ID_KEY, request.conversation_id) - - # Initialize accumulated messages list - self._output_messages: list[str] = list(response.messages) - - # Set response messages - self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(self._output_messages)) + self._set_output(response.messages) # Set user details if provided if user_details: @@ -104,16 +105,24 @@ def __init__( validate_and_normalize_ip(user_details.user_client_ip), ) - def record_output_messages(self, messages: list[str]) -> None: + def _set_output(self, messages: ResponseMessagesParam) -> None: + """Serialize and set the output messages attribute on the span.""" + if isinstance(messages, dict): + self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(messages)) + else: + normalized = normalize_output_messages(messages) + wrapper = OutputMessages(messages=list(normalized.messages)) + self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, serialize_messages(wrapper)) + + def record_output_messages(self, messages: ResponseMessagesParam) -> None: """Records the output messages for telemetry tracking. - Appends the provided messages to the accumulated output messages list. - The list is capped at _MAX_OUTPUT_MESSAGES to prevent unbounded memory growth. + Overwrites any previously set output messages. Accepts a single string, + a list of strings (auto-wrapped as OTEL OutputMessage), a versioned + ``OutputMessages`` wrapper, or a ``dict[str, object]`` for tool call + results (per OTEL spec). Args: - messages: List of output messages to append + messages: String(s), OutputMessages, or dict for tool call results """ - self._output_messages.extend(messages) - if len(self._output_messages) > self._MAX_OUTPUT_MESSAGES: - self._output_messages = self._output_messages[-self._MAX_OUTPUT_MESSAGES :] - self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(self._output_messages)) + self._set_output(messages) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/tenant_details.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/tenant_details.py deleted file mode 100644 index a9186fcb..00000000 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/tenant_details.py +++ /dev/null @@ -1,12 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -# Tenant details class. -from dataclasses import dataclass - - -@dataclass -class TenantDetails: - """Represents the tenant id attached to the span.""" - - tenant_id: str diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/tool_call_details.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/tool_call_details.py index fbcaafb9..b36c577b 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/tool_call_details.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/tool_call_details.py @@ -3,8 +3,11 @@ # Data class for tool call details. +from __future__ import annotations + from dataclasses import dataclass -from urllib.parse import ParseResult + +from .models.service_endpoint import ServiceEndpoint @dataclass @@ -12,8 +15,8 @@ class ToolCallDetails: """Details of a tool call made by an agent in the system.""" tool_name: str - arguments: str | None = None + arguments: dict[str, object] | str | None = None tool_call_id: str | None = None description: str | None = None tool_type: str | None = None - endpoint: ParseResult | None = None + endpoint: ServiceEndpoint | None = None diff --git a/tests/observability/core/test_execute_tool_scope.py b/tests/observability/core/test_execute_tool_scope.py index 169d72d6..a9c292a1 100644 --- a/tests/observability/core/test_execute_tool_scope.py +++ b/tests/observability/core/test_execute_tool_scope.py @@ -84,7 +84,6 @@ def test_record_response_method_exists(self): scope = ExecuteToolScope.start(Request(), self.tool_details, self.agent_details) if scope is not None: - # Test that the method exists self.assertTrue(hasattr(scope, "record_response")) self.assertTrue(callable(scope.record_response)) scope.dispose() diff --git a/tests/observability/core/test_invoke_agent_scope.py b/tests/observability/core/test_invoke_agent_scope.py index 2996aa82..aeb6a660 100644 --- a/tests/observability/core/test_invoke_agent_scope.py +++ b/tests/observability/core/test_invoke_agent_scope.py @@ -5,7 +5,6 @@ import sys import unittest from pathlib import Path -from urllib.parse import urlparse import pytest from microsoft_agents_a365.observability.core import ( @@ -15,6 +14,7 @@ InvokeAgentScope, InvokeAgentScopeDetails, Request, + ServiceEndpoint, SpanDetails, UserDetails, configure, @@ -62,7 +62,7 @@ def setUpClass(cls): agent_description="A test agent for invoke scope testing", ) cls.invoke_scope_details = InvokeAgentScopeDetails( - endpoint=urlparse("https://example.com/agent"), + endpoint=ServiceEndpoint(hostname="example.com", port=443), ) # Create channel for requests diff --git a/tests/observability/core/test_message_utils.py b/tests/observability/core/test_message_utils.py new file mode 100644 index 00000000..e4c75f46 --- /dev/null +++ b/tests/observability/core/test_message_utils.py @@ -0,0 +1,311 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for message_utils conversion, normalization, and serialization helpers.""" + +import json +import sys +import unittest +from pathlib import Path + +import pytest +from microsoft_agents_a365.observability.core.message_utils import ( + is_string_list, + is_wrapped_messages, + normalize_input_messages, + normalize_output_messages, + serialize_messages, + to_input_messages, + to_output_messages, +) +from microsoft_agents_a365.observability.core.models.messages import ( + A365_MESSAGE_SCHEMA_VERSION, + BlobPart, + ChatMessage, + FinishReason, + InputMessages, + MessageRole, + OutputMessage, + OutputMessages, + ReasoningPart, + TextPart, + ToolCallRequestPart, +) + + +class TestTypeGuards(unittest.TestCase): + """Tests for is_string_list and is_wrapped_messages type guards.""" + + def test_is_string_list_with_strings(self): + self.assertTrue(is_string_list(["hello", "world"])) + + def test_is_string_list_with_empty_list(self): + self.assertTrue(is_string_list([])) + + def test_is_string_list_with_input_messages(self): + wrapper = InputMessages(messages=[]) + self.assertFalse(is_string_list(wrapper)) + + def test_is_string_list_with_output_messages(self): + wrapper = OutputMessages(messages=[]) + self.assertFalse(is_string_list(wrapper)) + + def test_is_wrapped_messages_with_input_messages(self): + wrapper = InputMessages(messages=[]) + self.assertTrue(is_wrapped_messages(wrapper)) + + def test_is_wrapped_messages_with_output_messages(self): + wrapper = OutputMessages(messages=[]) + self.assertTrue(is_wrapped_messages(wrapper)) + + def test_is_wrapped_messages_with_string_list(self): + self.assertFalse(is_wrapped_messages(["hello"])) + + def test_is_wrapped_messages_with_empty_list(self): + self.assertFalse(is_wrapped_messages([])) + + +class TestConversion(unittest.TestCase): + """Tests for to_input_messages and to_output_messages.""" + + def test_to_input_messages_single(self): + result = to_input_messages(["Hello"]) + self.assertEqual(len(result), 1) + self.assertEqual(result[0].role, MessageRole.USER) + self.assertEqual(len(result[0].parts), 1) + self.assertIsInstance(result[0].parts[0], TextPart) + self.assertEqual(result[0].parts[0].content, "Hello") + + def test_to_input_messages_multiple(self): + result = to_input_messages(["Hello", "How are you?"]) + self.assertEqual(len(result), 2) + self.assertEqual(result[0].parts[0].content, "Hello") + self.assertEqual(result[1].parts[0].content, "How are you?") + + def test_to_input_messages_empty(self): + result = to_input_messages([]) + self.assertEqual(result, []) + + def test_to_output_messages_single(self): + result = to_output_messages(["Response text"]) + self.assertEqual(len(result), 1) + self.assertIsInstance(result[0], OutputMessage) + self.assertEqual(result[0].role, MessageRole.ASSISTANT) + self.assertEqual(result[0].parts[0].content, "Response text") + + def test_to_output_messages_multiple(self): + result = to_output_messages(["First", "Second"]) + self.assertEqual(len(result), 2) + for msg in result: + self.assertEqual(msg.role, MessageRole.ASSISTANT) + + def test_to_output_messages_empty(self): + result = to_output_messages([]) + self.assertEqual(result, []) + + +class TestNormalization(unittest.TestCase): + """Tests for normalize_input_messages and normalize_output_messages.""" + + def test_normalize_input_from_strings(self): + result = normalize_input_messages(["Hello"]) + self.assertIsInstance(result, InputMessages) + self.assertEqual(result.version, A365_MESSAGE_SCHEMA_VERSION) + self.assertEqual(len(result.messages), 1) + self.assertEqual(result.messages[0].role, MessageRole.USER) + + def test_normalize_input_from_wrapper(self): + wrapper = InputMessages( + messages=[ + ChatMessage(role=MessageRole.SYSTEM, parts=[TextPart(content="System prompt")]) + ] + ) + result = normalize_input_messages(wrapper) + self.assertIs(result, wrapper) + + def test_normalize_output_from_strings(self): + result = normalize_output_messages(["Response"]) + self.assertIsInstance(result, OutputMessages) + self.assertEqual(result.version, A365_MESSAGE_SCHEMA_VERSION) + self.assertEqual(len(result.messages), 1) + self.assertEqual(result.messages[0].role, MessageRole.ASSISTANT) + + def test_normalize_output_from_wrapper(self): + wrapper = OutputMessages( + messages=[ + OutputMessage( + role=MessageRole.ASSISTANT, + parts=[TextPart(content="Answer")], + finish_reason=FinishReason.STOP.value, + ) + ] + ) + result = normalize_output_messages(wrapper) + self.assertIs(result, wrapper) + + def test_normalize_input_empty_list(self): + result = normalize_input_messages([]) + self.assertIsInstance(result, InputMessages) + self.assertEqual(result.messages, []) + + def test_normalize_output_empty_list(self): + result = normalize_output_messages([]) + self.assertIsInstance(result, OutputMessages) + self.assertEqual(result.messages, []) + + +class TestSerialization(unittest.TestCase): + """Tests for serialize_messages.""" + + def test_serialize_input_messages(self): + wrapper = InputMessages( + messages=[ChatMessage(role=MessageRole.USER, parts=[TextPart(content="Hello")])] + ) + result = serialize_messages(wrapper) + parsed = json.loads(result) + + self.assertEqual(parsed["version"], A365_MESSAGE_SCHEMA_VERSION) + self.assertEqual(len(parsed["messages"]), 1) + self.assertEqual(parsed["messages"][0]["role"], "user") + self.assertEqual(parsed["messages"][0]["parts"][0]["type"], "text") + self.assertEqual(parsed["messages"][0]["parts"][0]["content"], "Hello") + + def test_serialize_output_messages(self): + wrapper = OutputMessages( + messages=[ + OutputMessage( + role=MessageRole.ASSISTANT, + parts=[TextPart(content="Response")], + finish_reason=FinishReason.STOP.value, + ) + ] + ) + result = serialize_messages(wrapper) + parsed = json.loads(result) + + self.assertEqual(parsed["version"], A365_MESSAGE_SCHEMA_VERSION) + self.assertEqual(parsed["messages"][0]["role"], "assistant") + self.assertEqual(parsed["messages"][0]["finish_reason"], "stop") + + def test_serialize_omits_none_values(self): + wrapper = InputMessages( + messages=[ChatMessage(role=MessageRole.USER, parts=[TextPart(content="Hi")])] + ) + result = serialize_messages(wrapper) + parsed = json.loads(result) + + # name is None so should not appear + self.assertNotIn("name", parsed["messages"][0]) + + def test_serialize_complex_parts(self): + wrapper = InputMessages( + messages=[ + ChatMessage( + role=MessageRole.USER, + parts=[ + TextPart(content="Analyze this image"), + BlobPart(modality="image", content="base64data", mime_type="image/png"), + ], + ) + ] + ) + result = serialize_messages(wrapper) + parsed = json.loads(result) + + parts = parsed["messages"][0]["parts"] + self.assertEqual(len(parts), 2) + self.assertEqual(parts[0]["type"], "text") + self.assertEqual(parts[1]["type"], "blob") + self.assertEqual(parts[1]["modality"], "image") + self.assertEqual(parts[1]["content"], "base64data") + + def test_serialize_with_tool_call_part(self): + wrapper = OutputMessages( + messages=[ + OutputMessage( + role=MessageRole.ASSISTANT, + parts=[ + ToolCallRequestPart( + name="search", + id="call_123", + arguments={"query": "GDPR"}, + ) + ], + ) + ] + ) + result = serialize_messages(wrapper) + parsed = json.loads(result) + + part = parsed["messages"][0]["parts"][0] + self.assertEqual(part["type"], "tool_call") + self.assertEqual(part["name"], "search") + self.assertEqual(part["id"], "call_123") + self.assertEqual(part["arguments"], {"query": "GDPR"}) + + def test_serialize_with_reasoning_part(self): + wrapper = OutputMessages( + messages=[ + OutputMessage( + role=MessageRole.ASSISTANT, + parts=[ + ReasoningPart(content="Checking GDPR Article 5"), + TextPart(content="Based on GDPR..."), + ], + finish_reason=FinishReason.STOP.value, + ) + ] + ) + result = serialize_messages(wrapper) + parsed = json.loads(result) + + parts = parsed["messages"][0]["parts"] + self.assertEqual(parts[0]["type"], "reasoning") + self.assertEqual(parts[0]["content"], "Checking GDPR Article 5") + self.assertEqual(parts[1]["type"], "text") + + def test_serialize_unicode(self): + wrapper = InputMessages( + messages=[ + ChatMessage( + role=MessageRole.USER, + parts=[TextPart(content="日本語テスト 🚀")], + ) + ] + ) + result = serialize_messages(wrapper) + self.assertIn("日本語テスト", result) + self.assertIn("🚀", result) + + def test_serialize_empty_messages(self): + wrapper = InputMessages(messages=[]) + result = serialize_messages(wrapper) + parsed = json.loads(result) + self.assertEqual(parsed["version"], A365_MESSAGE_SCHEMA_VERSION) + self.assertEqual(parsed["messages"], []) + + +class TestVersionField(unittest.TestCase): + """Tests for the version field on wrappers.""" + + def test_input_messages_version_is_constant(self): + wrapper = InputMessages(messages=[]) + self.assertEqual(wrapper.version, A365_MESSAGE_SCHEMA_VERSION) + + def test_output_messages_version_is_constant(self): + wrapper = OutputMessages(messages=[]) + self.assertEqual(wrapper.version, A365_MESSAGE_SCHEMA_VERSION) + + def test_version_not_settable_via_constructor(self): + """Version field uses init=False so it cannot be passed as a constructor arg.""" + with self.assertRaises(TypeError): + InputMessages(messages=[], version="99.99.99") # type: ignore[call-arg] + + def test_version_embedded_in_serialized_output(self): + wrapper = InputMessages(messages=[]) + result = json.loads(serialize_messages(wrapper)) + self.assertEqual(result["version"], A365_MESSAGE_SCHEMA_VERSION) + + +if __name__ == "__main__": + sys.exit(pytest.main([str(Path(__file__))] + sys.argv[1:])) diff --git a/tests/observability/core/test_output_scope.py b/tests/observability/core/test_output_scope.py index fe6ba164..f2b2526c 100644 --- a/tests/observability/core/test_output_scope.py +++ b/tests/observability/core/test_output_scope.py @@ -91,22 +91,18 @@ def test_output_scope_creates_span_with_messages(self): self.assertIn("First message", output_value) self.assertIn("Second message", output_value) - def test_record_output_messages_appends(self): - """Test record_output_messages appends to accumulated messages.""" + def test_record_output_messages_overwrites(self): + """Test record_output_messages overwrites previously set messages.""" response = Response(messages=["Initial"]) with OutputScope.start(Request(), response, self.agent_details) as scope: - scope.record_output_messages(["Appended 1"]) - scope.record_output_messages(["Appended 2", "Appended 3"]) + scope.record_output_messages(["Final message"]) _, attributes = self._get_last_span() output_value = attributes[GEN_AI_OUTPUT_MESSAGES_KEY] - # All messages should be present (initial + all appended) - self.assertIn("Initial", output_value) - self.assertIn("Appended 1", output_value) - self.assertIn("Appended 2", output_value) - self.assertIn("Appended 3", output_value) + self.assertNotIn("Initial", output_value) + self.assertIn("Final message", output_value) def test_output_scope_with_parent_context(self): """Test OutputScope uses parent_context to link span to parent context.""" diff --git a/tests/observability/core/test_output_scope_bounded.py b/tests/observability/core/test_output_scope_bounded.py index 93718684..dcc292ea 100644 --- a/tests/observability/core/test_output_scope_bounded.py +++ b/tests/observability/core/test_output_scope_bounded.py @@ -1,95 +1,83 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -"""Tests for bounded output messages in OutputScope.""" +"""Tests for OutputScope record_output_messages overwrite behavior.""" +import json import unittest from unittest.mock import MagicMock, patch +from microsoft_agents_a365.observability.core.constants import GEN_AI_OUTPUT_MESSAGES_KEY +from microsoft_agents_a365.observability.core.models.messages import ( + A365_MESSAGE_SCHEMA_VERSION, + MessageRole, + OutputMessage, + OutputMessages, + TextPart, +) from microsoft_agents_a365.observability.core.spans_scopes.output_scope import OutputScope -class TestOutputScopeBounded(unittest.TestCase): - """Tests that OutputScope._output_messages list is properly bounded.""" +class TestOutputScopeOverwrite(unittest.TestCase): + """Tests that OutputScope.record_output_messages overwrites (not accumulates).""" - def _make_scope(self, initial_messages: list[str] | None = None) -> OutputScope: + def _make_scope(self) -> OutputScope: """Create an OutputScope with mocked dependencies.""" - agent_details = MagicMock() - agent_details.agent_id = "test-agent" - agent_details.agent_name = "Test Agent" - agent_details.agent_description = None - agent_details.platform_id = None - agent_details.icon_uri = None - agent_details.agentic_user_id = None - agent_details.agentic_user_email = None - agent_details.agent_blueprint_id = None - - response = MagicMock() - response.messages = initial_messages or ["hello"] - with patch.object(OutputScope, "__init__", lambda self, *a, **kw: None): scope = OutputScope.__new__(OutputScope) - scope._output_messages = list(response.messages) scope.set_tag_maybe = MagicMock() - + scope._span = MagicMock() return scope - def test_max_output_messages_default(self): - """Default _MAX_OUTPUT_MESSAGES should be 5000.""" - self.assertEqual(OutputScope._MAX_OUTPUT_MESSAGES, 5000) - - def test_record_output_messages_within_limit(self): - """Messages under the limit should not be truncated.""" - scope = self._make_scope(["initial"]) - scope.record_output_messages(["msg1", "msg2", "msg3"]) - self.assertEqual(len(scope._output_messages), 4) - self.assertEqual(scope._output_messages, ["initial", "msg1", "msg2", "msg3"]) - - def test_record_output_messages_exceeds_limit(self): - """Messages exceeding the limit should be truncated to keep newest.""" - scope = self._make_scope([]) - original_max = OutputScope._MAX_OUTPUT_MESSAGES - try: - OutputScope._MAX_OUTPUT_MESSAGES = 10 - - # Add 15 messages - scope.record_output_messages([f"msg_{i}" for i in range(15)]) - - # Should be capped at 10 (keeping the newest) - self.assertEqual(len(scope._output_messages), 10) - # Oldest 5 should be gone, newest 10 should remain - self.assertEqual(scope._output_messages[0], "msg_5") - self.assertEqual(scope._output_messages[-1], "msg_14") - finally: - OutputScope._MAX_OUTPUT_MESSAGES = original_max - - def test_record_output_messages_multiple_calls_capped(self): - """Multiple calls to record_output_messages should stay bounded.""" - scope = self._make_scope([]) - original_max = OutputScope._MAX_OUTPUT_MESSAGES - try: - OutputScope._MAX_OUTPUT_MESSAGES = 5 - - for batch in range(4): - scope.record_output_messages([f"batch{batch}_msg{i}" for i in range(3)]) - - # Total of 12 messages added in 4 batches, should be capped at 5 - self.assertLessEqual(len(scope._output_messages), 5) - # Latest messages should be from the last batches - self.assertIn("batch3_msg2", scope._output_messages) - finally: - OutputScope._MAX_OUTPUT_MESSAGES = original_max - - def test_record_output_messages_exactly_at_limit(self): - """Messages exactly at the limit should not be truncated.""" - scope = self._make_scope([]) - original_max = OutputScope._MAX_OUTPUT_MESSAGES - try: - OutputScope._MAX_OUTPUT_MESSAGES = 5 - scope.record_output_messages([f"msg_{i}" for i in range(5)]) - self.assertEqual(len(scope._output_messages), 5) - finally: - OutputScope._MAX_OUTPUT_MESSAGES = original_max + def test_record_overwrites_with_strings(self): + """Calling record_output_messages with strings sets the attribute.""" + scope = self._make_scope() + scope.record_output_messages(["Final response"]) + + scope.set_tag_maybe.assert_called() + call_args = scope.set_tag_maybe.call_args + self.assertEqual(call_args[0][0], GEN_AI_OUTPUT_MESSAGES_KEY) + parsed = json.loads(call_args[0][1]) + self.assertEqual(parsed["version"], A365_MESSAGE_SCHEMA_VERSION) + self.assertEqual(parsed["messages"][0]["parts"][0]["content"], "Final response") + + def test_record_overwrites_with_structured(self): + """Calling record_output_messages with OutputMessages sets the attribute.""" + scope = self._make_scope() + wrapper = OutputMessages( + messages=[ + OutputMessage( + role=MessageRole.ASSISTANT, + parts=[TextPart(content="Structured")], + ) + ] + ) + scope.record_output_messages(wrapper) + + call_args = scope.set_tag_maybe.call_args + parsed = json.loads(call_args[0][1]) + self.assertEqual(parsed["messages"][0]["parts"][0]["content"], "Structured") + + def test_record_overwrites_with_dict(self): + """Calling record_output_messages with dict sets JSON directly.""" + scope = self._make_scope() + scope.record_output_messages({"result": "tool output"}) + + call_args = scope.set_tag_maybe.call_args + parsed = json.loads(call_args[0][1]) + self.assertEqual(parsed["result"], "tool output") + + def test_second_call_replaces_first(self): + """Second call to record_output_messages replaces the first.""" + scope = self._make_scope() + scope.record_output_messages(["First"]) + scope.record_output_messages(["Second"]) + + # Last call should have "Second", not "First" + call_args = scope.set_tag_maybe.call_args + parsed = json.loads(call_args[0][1]) + self.assertNotIn("First", call_args[0][1]) + self.assertEqual(parsed["messages"][0]["parts"][0]["content"], "Second") if __name__ == "__main__": diff --git a/tests/observability/core/test_scope_messages.py b/tests/observability/core/test_scope_messages.py new file mode 100644 index 00000000..eb75cb90 --- /dev/null +++ b/tests/observability/core/test_scope_messages.py @@ -0,0 +1,368 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for structured message recording on InvokeAgentScope, InferenceScope, and OutputScope.""" + +import json +import os +import sys +import unittest +from pathlib import Path + +import pytest +from microsoft_agents_a365.observability.core import ( + AgentDetails, + InferenceCallDetails, + InferenceOperationType, + InferenceScope, + InvokeAgentScope, + InvokeAgentScopeDetails, + Request, + ServiceEndpoint, + configure, + get_tracer_provider, +) +from microsoft_agents_a365.observability.core.config import _telemetry_manager +from microsoft_agents_a365.observability.core.constants import ( + GEN_AI_INPUT_MESSAGES_KEY, + GEN_AI_OUTPUT_MESSAGES_KEY, +) +from microsoft_agents_a365.observability.core.models.messages import ( + A365_MESSAGE_SCHEMA_VERSION, + ChatMessage, + FinishReason, + InputMessages, + MessageRole, + OutputMessage, + OutputMessages, + ReasoningPart, + TextPart, +) +from microsoft_agents_a365.observability.core.models.response import Response +from microsoft_agents_a365.observability.core.opentelemetry_scope import OpenTelemetryScope +from microsoft_agents_a365.observability.core.spans_scopes.output_scope import OutputScope +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + +class ScopeMessageTestBase(unittest.TestCase): + """Shared setup for scope message tests.""" + + @classmethod + def setUpClass(cls): + os.environ["ENABLE_A365_OBSERVABILITY"] = "true" + configure(service_name="test-scope-messages", service_namespace="test") + + cls.agent_details = AgentDetails( + agent_id="test-agent-123", + agent_name="Test Agent", + ) + cls.invoke_scope_details = InvokeAgentScopeDetails( + endpoint=ServiceEndpoint(hostname="example.com", port=443), + ) + cls.inference_details = InferenceCallDetails( + operationName=InferenceOperationType.CHAT, + model="gpt-4o", + providerName="openai", + ) + + def setUp(self): + super().setUp() + _telemetry_manager._tracer_provider = None + _telemetry_manager._span_processors = {} + OpenTelemetryScope._tracer = None + configure(service_name="test-scope-messages", service_namespace="test") + + self.span_exporter = InMemorySpanExporter() + tracer_provider = get_tracer_provider() + tracer_provider.add_span_processor(SimpleSpanProcessor(self.span_exporter)) + + def tearDown(self): + super().tearDown() + self.span_exporter.clear() + + def _get_last_span_attrs(self) -> dict: + spans = self.span_exporter.get_finished_spans() + self.assertTrue(spans, "Expected at least one span") + return dict(getattr(spans[-1], "attributes", {}) or {}) + + def _parse_messages(self, attr_value: str) -> dict: + parsed = json.loads(attr_value) + self.assertEqual(parsed["version"], A365_MESSAGE_SCHEMA_VERSION) + return parsed + + +class TestInvokeAgentScopeMessages(ScopeMessageTestBase): + """Tests for InvokeAgentScope message recording.""" + + def test_record_input_messages_with_strings(self): + """Plain string list should be auto-wrapped into versioned format.""" + scope = InvokeAgentScope.start(Request(), self.invoke_scope_details, self.agent_details) + scope.record_input_messages(["What is GDPR?"]) + scope.dispose() + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_INPUT_MESSAGES_KEY]) + self.assertEqual(len(parsed["messages"]), 1) + self.assertEqual(parsed["messages"][0]["role"], "user") + self.assertEqual(parsed["messages"][0]["parts"][0]["content"], "What is GDPR?") + + def test_record_input_messages_with_structured(self): + """Versioned InputMessages wrapper should be serialized as-is.""" + wrapper = InputMessages( + messages=[ + ChatMessage( + role=MessageRole.SYSTEM, + parts=[TextPart(content="You are a compliance assistant.")], + ), + ChatMessage( + role=MessageRole.USER, + parts=[TextPart(content="What are data retention policies?")], + ), + ] + ) + scope = InvokeAgentScope.start(Request(), self.invoke_scope_details, self.agent_details) + scope.record_input_messages(wrapper) + scope.dispose() + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_INPUT_MESSAGES_KEY]) + self.assertEqual(len(parsed["messages"]), 2) + self.assertEqual(parsed["messages"][0]["role"], "system") + self.assertEqual(parsed["messages"][1]["role"], "user") + + def test_record_output_messages_with_strings(self): + scope = InvokeAgentScope.start(Request(), self.invoke_scope_details, self.agent_details) + scope.record_output_messages(["GDPR requires data minimization."]) + scope.dispose() + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_OUTPUT_MESSAGES_KEY]) + self.assertEqual(parsed["messages"][0]["role"], "assistant") + self.assertEqual( + parsed["messages"][0]["parts"][0]["content"], + "GDPR requires data minimization.", + ) + + def test_record_output_messages_with_structured(self): + wrapper = OutputMessages( + messages=[ + OutputMessage( + role=MessageRole.ASSISTANT, + parts=[ + ReasoningPart(content="Checking Article 5(1)(e)"), + TextPart(content="Based on GDPR..."), + ], + finish_reason=FinishReason.STOP.value, + ) + ] + ) + scope = InvokeAgentScope.start(Request(), self.invoke_scope_details, self.agent_details) + scope.record_output_messages(wrapper) + scope.dispose() + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_OUTPUT_MESSAGES_KEY]) + msg = parsed["messages"][0] + self.assertEqual(msg["finish_reason"], "stop") + self.assertEqual(len(msg["parts"]), 2) + self.assertEqual(msg["parts"][0]["type"], "reasoning") + self.assertEqual(msg["parts"][1]["type"], "text") + + def test_record_response_wraps_string(self): + """record_response(str) should produce versioned output messages.""" + scope = InvokeAgentScope.start(Request(), self.invoke_scope_details, self.agent_details) + scope.record_response("Simple response") + scope.dispose() + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_OUTPUT_MESSAGES_KEY]) + self.assertEqual(parsed["messages"][0]["parts"][0]["content"], "Simple response") + + def test_request_content_string_auto_wrapped(self): + """Request.content as plain string should be wrapped into versioned format.""" + request = Request(content="What is GDPR?") + scope = InvokeAgentScope.start(request, self.invoke_scope_details, self.agent_details) + scope.dispose() + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_INPUT_MESSAGES_KEY]) + self.assertEqual(parsed["messages"][0]["role"], "user") + self.assertIn("What is GDPR?", parsed["messages"][0]["parts"][0]["content"]) + + def test_request_content_structured_input(self): + """Request.content as InputMessages should be serialized directly.""" + wrapper = InputMessages( + messages=[ChatMessage(role=MessageRole.USER, parts=[TextPart(content="Hello")])] + ) + request = Request(content=wrapper) + scope = InvokeAgentScope.start(request, self.invoke_scope_details, self.agent_details) + scope.dispose() + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_INPUT_MESSAGES_KEY]) + self.assertEqual(parsed["messages"][0]["parts"][0]["content"], "Hello") + + +class TestInferenceScopeMessages(ScopeMessageTestBase): + """Tests for InferenceScope message recording.""" + + def test_record_input_messages_with_strings(self): + scope = InferenceScope.start(Request(), self.inference_details, self.agent_details) + scope.record_input_messages(["Explain quantum computing"]) + scope.dispose() + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_INPUT_MESSAGES_KEY]) + self.assertEqual(parsed["messages"][0]["role"], "user") + self.assertEqual(parsed["messages"][0]["parts"][0]["content"], "Explain quantum computing") + + def test_record_input_messages_with_structured(self): + wrapper = InputMessages( + messages=[ + ChatMessage( + role=MessageRole.SYSTEM, + parts=[TextPart(content="You are helpful.")], + ), + ChatMessage( + role=MessageRole.USER, + parts=[TextPart(content="Question")], + ), + ] + ) + scope = InferenceScope.start(Request(), self.inference_details, self.agent_details) + scope.record_input_messages(wrapper) + scope.dispose() + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_INPUT_MESSAGES_KEY]) + self.assertEqual(len(parsed["messages"]), 2) + + def test_record_output_messages_with_strings(self): + scope = InferenceScope.start(Request(), self.inference_details, self.agent_details) + scope.record_output_messages(["Quantum computing uses qubits."]) + scope.dispose() + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_OUTPUT_MESSAGES_KEY]) + self.assertEqual(parsed["messages"][0]["role"], "assistant") + + def test_record_output_messages_with_structured(self): + wrapper = OutputMessages( + messages=[ + OutputMessage( + role=MessageRole.ASSISTANT, + parts=[TextPart(content="Answer")], + finish_reason=FinishReason.STOP.value, + ) + ] + ) + scope = InferenceScope.start(Request(), self.inference_details, self.agent_details) + scope.record_output_messages(wrapper) + scope.dispose() + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_OUTPUT_MESSAGES_KEY]) + self.assertEqual(parsed["messages"][0]["finish_reason"], "stop") + + def test_request_content_string_auto_wrapped(self): + request = Request(content="Test content") + scope = InferenceScope.start(request, self.inference_details, self.agent_details) + scope.dispose() + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_INPUT_MESSAGES_KEY]) + self.assertEqual(parsed["messages"][0]["parts"][0]["content"], "Test content") + + +class TestOutputScopeMessages(ScopeMessageTestBase): + """Tests for OutputScope structured message support.""" + + def test_initial_string_messages_wrapped(self): + """Response with plain strings should produce versioned output.""" + response = Response(messages=["First", "Second"]) + with OutputScope.start(Request(), response, self.agent_details): + pass + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_OUTPUT_MESSAGES_KEY]) + self.assertEqual(len(parsed["messages"]), 2) + self.assertEqual(parsed["messages"][0]["role"], "assistant") + self.assertEqual(parsed["messages"][0]["parts"][0]["content"], "First") + self.assertEqual(parsed["messages"][1]["parts"][0]["content"], "Second") + + def test_initial_structured_messages(self): + """Response with OutputMessages should be serialized directly.""" + wrapper = OutputMessages( + messages=[ + OutputMessage( + role=MessageRole.ASSISTANT, + parts=[TextPart(content="Structured output")], + finish_reason=FinishReason.STOP.value, + ) + ] + ) + response = Response(messages=wrapper) + with OutputScope.start(Request(), response, self.agent_details): + pass + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_OUTPUT_MESSAGES_KEY]) + self.assertEqual(parsed["messages"][0]["finish_reason"], "stop") + + def test_record_overwrites_string_messages(self): + """record_output_messages with strings overwrites previous messages.""" + response = Response(messages=["Initial"]) + with OutputScope.start(Request(), response, self.agent_details) as scope: + scope.record_output_messages(["Replacement"]) + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_OUTPUT_MESSAGES_KEY]) + self.assertEqual(len(parsed["messages"]), 1) + self.assertNotIn("Initial", attrs[GEN_AI_OUTPUT_MESSAGES_KEY]) + self.assertEqual(parsed["messages"][0]["parts"][0]["content"], "Replacement") + + def test_record_overwrites_with_structured(self): + """record_output_messages with OutputMessages overwrites previous messages.""" + response = Response(messages=["Initial"]) + replacement = OutputMessages( + messages=[ + OutputMessage( + role=MessageRole.ASSISTANT, + parts=[TextPart(content="Structured replacement")], + finish_reason=FinishReason.STOP.value, + ) + ] + ) + with OutputScope.start(Request(), response, self.agent_details) as scope: + scope.record_output_messages(replacement) + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_OUTPUT_MESSAGES_KEY]) + self.assertEqual(len(parsed["messages"]), 1) + self.assertEqual(parsed["messages"][0]["finish_reason"], "stop") + + def test_record_overwrites_with_dict(self): + """record_output_messages with dict sets tool result directly.""" + response = Response(messages=["Initial"]) + with OutputScope.start(Request(), response, self.agent_details) as scope: + scope.record_output_messages({"result": "tool output"}) + + attrs = self._get_last_span_attrs() + parsed = json.loads(attrs[GEN_AI_OUTPUT_MESSAGES_KEY]) + self.assertEqual(parsed["result"], "tool output") + + def test_no_record_keeps_initial(self): + """If record_output_messages is not called, initial value remains.""" + response = Response(messages=["Only initial"]) + with OutputScope.start(Request(), response, self.agent_details): + pass + + attrs = self._get_last_span_attrs() + parsed = self._parse_messages(attrs[GEN_AI_OUTPUT_MESSAGES_KEY]) + self.assertEqual(len(parsed["messages"]), 1) + self.assertEqual(parsed["messages"][0]["parts"][0]["content"], "Only initial") + + +if __name__ == "__main__": + sys.exit(pytest.main([str(Path(__file__))] + sys.argv[1:])) diff --git a/tests/observability/core/test_trace_context_propagation.py b/tests/observability/core/test_trace_context_propagation.py index 28027ea5..349452fb 100644 --- a/tests/observability/core/test_trace_context_propagation.py +++ b/tests/observability/core/test_trace_context_propagation.py @@ -5,7 +5,6 @@ import os import unittest -from urllib.parse import urlparse import pytest from microsoft_agents_a365.observability.core import ( @@ -17,6 +16,7 @@ InvokeAgentScope, InvokeAgentScopeDetails, Request, + ServiceEndpoint, SpanDetails, ToolCallDetails, configure, @@ -250,7 +250,7 @@ def test_invoke_agent_scope_with_parent_context(self): parent_context = extract_context_from_headers({"traceparent": traceparent}) invoke_scope_details = InvokeAgentScopeDetails( - endpoint=urlparse("https://example.com/agent"), + endpoint=ServiceEndpoint(hostname="example.com", port=443), ) with InvokeAgentScope.start( diff --git a/tests/usage_example.py b/tests/usage_example.py index ea6537b6..a0285944 100644 --- a/tests/usage_example.py +++ b/tests/usage_example.py @@ -2,7 +2,6 @@ # Licensed under the MIT License. import os -from urllib.parse import urlparse def main(): @@ -18,6 +17,7 @@ def main(): InvokeAgentScope, InvokeAgentScopeDetails, Request, + ServiceEndpoint, configure, ) @@ -67,7 +67,7 @@ def main(): # Create invoke agent scope details (aligned with .NET SDK) invoke_scope_details = InvokeAgentScopeDetails( - endpoint=urlparse("https://agents.company.com:8080/inventory"), + endpoint=ServiceEndpoint(hostname="agents.company.com", port=8080), ) # Create request for the invocation