diff --git a/src/IceRpc.Protobuf/RpcMethods/Internal/PipeReaderExtensions.cs b/src/IceRpc.Protobuf/RpcMethods/Internal/PipeReaderExtensions.cs index 86ce27f48a..5e03495283 100644 --- a/src/IceRpc.Protobuf/RpcMethods/Internal/PipeReaderExtensions.cs +++ b/src/IceRpc.Protobuf/RpcMethods/Internal/PipeReaderExtensions.cs @@ -31,6 +31,23 @@ internal static async ValueTask DecodeProtobufMessageAsync( cancellationToken).ConfigureAwait(false); Debug.Assert(message is not null); + + // A unary payload must contain exactly one message; any trailing bytes indicate a framing error. + ReadResult readResult = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); + // We never call CancelPendingRead; an interceptor or middleware can but it's not correct. + if (readResult.IsCanceled) + { + throw new InvalidOperationException("Unexpected call to CancelPendingRead."); + } + bool hasTrailingBytes = !readResult.Buffer.IsEmpty; + reader.AdvanceTo(readResult.Buffer.End); + if (hasTrailingBytes) + { + throw new InvalidDataException( + "The payload contains unexpected trailing bytes after the Protobuf message."); + } + Debug.Assert(readResult.IsCompleted); + return message; } diff --git a/tests/IceRpc.Protobuf.Tests/PipeReaderExtensionsTests.cs b/tests/IceRpc.Protobuf.Tests/PipeReaderExtensionsTests.cs index 26a6704204..2aaff7d7a2 100644 --- a/tests/IceRpc.Protobuf.Tests/PipeReaderExtensionsTests.cs +++ b/tests/IceRpc.Protobuf.Tests/PipeReaderExtensionsTests.cs @@ -1,8 +1,11 @@ // Copyright (c) ZeroC, Inc. +using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using IceRpc.Protobuf.RpcMethods.Internal; using NUnit.Framework; +using System.Buffers; +using System.Buffers.Binary; using System.IO.Pipelines; namespace IceRpc.Protobuf.Tests; @@ -26,4 +29,49 @@ await pipeReader.DecodeProtobufMessageAsync( CancellationToken.None)); pipeReader.Complete(); } + + [Test] + public void Decode_message_throws_invalid_data_exception_when_payload_has_trailing_bytes() + { + // Arrange + var pipe = new Pipe(); + WriteLengthPrefixedMessage(pipe.Writer, new StringValue { Value = "hello" }); + pipe.Writer.Write(new byte[] { 0xDE, 0xAD, 0xBE, 0xEF }); + pipe.Writer.Complete(); + + // Act & Assert + Assert.ThrowsAsync(async () => + await pipe.Reader.DecodeProtobufMessageAsync( + StringValue.Parser, + maxMessageLength: 1024, + CancellationToken.None)); + pipe.Reader.Complete(); + } + + [Test] + public void Decode_message_throws_invalid_data_exception_when_payload_has_concatenated_messages() + { + // Arrange + var pipe = new Pipe(); + WriteLengthPrefixedMessage(pipe.Writer, new StringValue { Value = "hello" }); + WriteLengthPrefixedMessage(pipe.Writer, new StringValue { Value = "world" }); + pipe.Writer.Complete(); + + // Act & Assert + Assert.ThrowsAsync(async () => + await pipe.Reader.DecodeProtobufMessageAsync( + StringValue.Parser, + maxMessageLength: 1024, + CancellationToken.None)); + pipe.Reader.Complete(); + } + + private static void WriteLengthPrefixedMessage(PipeWriter writer, IMessage message) + { + writer.Write(new byte[] { 0 }); // Not compressed + Span lengthBytes = writer.GetSpan(4); + BinaryPrimitives.WriteInt32BigEndian(lengthBytes, message.CalculateSize()); + writer.Advance(4); + message.WriteTo(writer); + } }