Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/IceRpc.Protobuf/RpcMethods/Internal/PipeReaderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ internal static async ValueTask<T> DecodeProtobufMessageAsync<T>(
cancellationToken).ConfigureAwait(false);

Debug.Assert(message is not null);

// A unary payload must contain exactly one message; any trailing bytes indicate a framing error.
ReadResult readResult = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
// We never call CancelPendingRead; an interceptor or middleware can but it's not correct.
if (readResult.IsCanceled)
{
throw new InvalidOperationException("Unexpected call to CancelPendingRead.");
}
bool hasTrailingBytes = !readResult.Buffer.IsEmpty;
reader.AdvanceTo(readResult.Buffer.End);
if (hasTrailingBytes)
{
throw new InvalidDataException(
"The payload contains unexpected trailing bytes after the Protobuf message.");
}
Debug.Assert(readResult.IsCompleted);

return message;
}

Expand Down
48 changes: 48 additions & 0 deletions tests/IceRpc.Protobuf.Tests/PipeReaderExtensionsTests.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
// Copyright (c) ZeroC, Inc.

using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using IceRpc.Protobuf.RpcMethods.Internal;
using NUnit.Framework;
using System.Buffers;
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using System.Buffers; is not used in this test file. The repo has TreatWarningsAsErrors=true, so this unused using will fail the build; please remove it (or use the type that requires it).

Suggested change
using System.Buffers;

Copilot uses AI. Check for mistakes.
using System.Buffers.Binary;
using System.IO.Pipelines;

namespace IceRpc.Protobuf.Tests;
Expand All @@ -26,4 +29,49 @@ await pipeReader.DecodeProtobufMessageAsync(
CancellationToken.None));
pipeReader.Complete();
}

[Test]
public void Decode_message_throws_invalid_data_exception_when_payload_has_trailing_bytes()
{
// Arrange
var pipe = new Pipe();
WriteLengthPrefixedMessage(pipe.Writer, new StringValue { Value = "hello" });
pipe.Writer.Write(new byte[] { 0xDE, 0xAD, 0xBE, 0xEF });
pipe.Writer.Complete();

// Act & Assert
Assert.ThrowsAsync<InvalidDataException>(async () =>
await pipe.Reader.DecodeProtobufMessageAsync(
StringValue.Parser,
maxMessageLength: 1024,
CancellationToken.None));
pipe.Reader.Complete();
}

[Test]
public void Decode_message_throws_invalid_data_exception_when_payload_has_concatenated_messages()
{
// Arrange
var pipe = new Pipe();
WriteLengthPrefixedMessage(pipe.Writer, new StringValue { Value = "hello" });
WriteLengthPrefixedMessage(pipe.Writer, new StringValue { Value = "world" });
pipe.Writer.Complete();

// Act & Assert
Assert.ThrowsAsync<InvalidDataException>(async () =>
await pipe.Reader.DecodeProtobufMessageAsync(
StringValue.Parser,
maxMessageLength: 1024,
CancellationToken.None));
pipe.Reader.Complete();
}

private static void WriteLengthPrefixedMessage(PipeWriter writer, IMessage message)
{
writer.Write(new byte[] { 0 }); // Not compressed
Span<byte> lengthBytes = writer.GetSpan(4);
BinaryPrimitives.WriteInt32BigEndian(lengthBytes, message.CalculateSize());
writer.Advance(4);
message.WriteTo(writer);
}
}
Loading