-
Notifications
You must be signed in to change notification settings - Fork 11
gRPC OTel Receiver support #98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
krisztianfekete
merged 8 commits into
agentevals-dev:main
from
ajimenez1503:feat/grpc-receiver-support
Apr 3, 2026
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
6f64838
gRPC OTel Receiver support
antonjim-te a846daf
improvements
antonjim-te 7a1231d
Apply feedback
antonjim-te f12618f
Apply feedback
antonjim-te 4349e00
Fix linter
antonjim-te 69f6a83
Merge branch 'main' into feat/grpc-receiver-support
ajimenez1503 0018be2
Apply feedback
antonjim-te 606d42c
Apply feedback
antonjim-te File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| apiVersion: v2 | ||
| name: agentevals | ||
| description: agentevals web UI, OTLP HTTP receiver, and MCP (Streamable HTTP) | ||
| description: agentevals web UI, OTLP HTTP+gRPC receivers, and MCP (Streamable HTTP) | ||
| type: application | ||
| version: 0.1.0 | ||
| appVersion: "0.5.2" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| """OTLP gRPC receiver services for traces and logs. | ||
|
|
||
| Receives standard OTLP/gRPC Export requests on port 4317 and forwards them | ||
| into the same StreamingTraceManager pipeline used by OTLP/HTTP routes. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| from google.protobuf.json_format import MessageToDict | ||
| from opentelemetry.proto.collector.logs.v1 import logs_service_pb2, logs_service_pb2_grpc | ||
| from opentelemetry.proto.collector.trace.v1 import trace_service_pb2, trace_service_pb2_grpc | ||
|
|
||
| from .otlp_processing import fix_protobuf_id_fields, process_logs, process_traces | ||
|
|
||
| if TYPE_CHECKING: | ||
| from grpc import aio | ||
|
|
||
| from ..streaming.ws_server import StreamingTraceManager | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| GRPC_SHUTDOWN_GRACE_SECONDS = 5 | ||
| DEFAULT_GRPC_MAX_CONCURRENT_RPCS = 32 | ||
| DEFAULT_GRPC_MAX_MESSAGE_BYTES = 8 * 1024 * 1024 | ||
|
|
||
|
|
||
| class OtlpTraceService(trace_service_pb2_grpc.TraceServiceServicer): | ||
| """OTLP TraceService gRPC implementation.""" | ||
|
|
||
| def __init__(self, manager: StreamingTraceManager): | ||
| self._manager = manager | ||
|
|
||
| async def Export(self, request, context): # noqa: N802 (gRPC method name) | ||
| body = MessageToDict(request, preserving_proto_field_name=False) | ||
| fix_protobuf_id_fields(body) | ||
| await process_traces(body, self._manager) | ||
| return trace_service_pb2.ExportTraceServiceResponse() | ||
|
|
||
|
|
||
| class OtlpLogsService(logs_service_pb2_grpc.LogsServiceServicer): | ||
| """OTLP LogsService gRPC implementation.""" | ||
|
|
||
| def __init__(self, manager: StreamingTraceManager): | ||
| self._manager = manager | ||
|
|
||
| async def Export(self, request, context): # noqa: N802 (gRPC method name) | ||
| body = MessageToDict(request, preserving_proto_field_name=False) | ||
| fix_protobuf_id_fields(body) | ||
| await process_logs(body, self._manager) | ||
| return logs_service_pb2.ExportLogsServiceResponse() | ||
|
|
||
|
|
||
| def create_otlp_grpc_server( | ||
| host: str, | ||
| port: int, | ||
| manager: StreamingTraceManager, | ||
| *, | ||
| max_concurrent_rpcs: int = DEFAULT_GRPC_MAX_CONCURRENT_RPCS, | ||
| max_message_bytes: int = DEFAULT_GRPC_MAX_MESSAGE_BYTES, | ||
| ) -> aio.Server: | ||
| """Create an OTLP gRPC server bound to host:port.""" | ||
| try: | ||
| import grpc | ||
| except ImportError as exc: # pragma: no cover - environment-dependent | ||
| raise RuntimeError("OTLP gRPC receiver requires grpcio. Install with: pip install grpcio") from exc | ||
|
|
||
| server = grpc.aio.server( | ||
| compression=grpc.Compression.Gzip, | ||
| maximum_concurrent_rpcs=max_concurrent_rpcs, | ||
| options=[ | ||
| ("grpc.max_receive_message_length", max_message_bytes), | ||
| ("grpc.max_send_message_length", max_message_bytes), | ||
| ], | ||
| ) | ||
| trace_service_pb2_grpc.add_TraceServiceServicer_to_server(OtlpTraceService(manager), server) | ||
| logs_service_pb2_grpc.add_LogsServiceServicer_to_server(OtlpLogsService(manager), server) | ||
|
|
||
| listen_addr = f"{host}:{port}" | ||
| bound_port = server.add_insecure_port(listen_addr) | ||
| if bound_port == 0: | ||
| raise RuntimeError(f"Failed to bind OTLP gRPC receiver to {listen_addr}") | ||
|
|
||
| logger.info( | ||
| "OTLP gRPC receiver configured at %s (gzip enabled, max_concurrent_rpcs=%d, max_msg=%d)", | ||
| listen_addr, | ||
| max_concurrent_rpcs, | ||
| max_message_bytes, | ||
| ) | ||
| return server | ||
|
|
||
|
|
||
| async def stop_otlp_grpc_server(server: aio.Server, *, force: bool = False) -> None: | ||
| """Stop the OTLP gRPC server with graceful or forced semantics.""" | ||
| grace = 0 if force else GRPC_SHUTDOWN_GRACE_SECONDS | ||
| await server.stop(grace=grace) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please update the docs, so it's clear when to use what port and pipeline configs? We have a couple of leftovers from the previous iterations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried improving the docs.
What do you want me to make more clear? Thanks :)