Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ tracing-full = ["opentelemetry", "opentelemetry_sdk", "opentelemetry-stdout", "t

[dev-dependencies]
tempfile = "3.27.0"
proptest = "1"
129 changes: 104 additions & 25 deletions crates/common/src/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,49 @@ pub const PROTOBUF_MAGIC: &[u8] = &[0x6b, 0x38, 0x73, 0x00];
pub const CONTENT_TYPE_PROTOBUF: &str = "application/vnd.kubernetes.protobuf";
pub const CONTENT_TYPE_PROTOBUF_STREAM: &str = "application/vnd.kubernetes.protobuf;stream=watch";

/// Unknown wraps objects with type metadata for protobuf serialization
/// This mirrors k8s.io/apimachinery/pkg/runtime.Unknown
/// TypeMeta as a nested protobuf message. Sits inside `Unknown.typeMeta`
/// at field 1; itself has `apiVersion=1`, `kind=2`. Mirrors
/// `k8s.io/apimachinery/pkg/runtime/generated.proto::TypeMeta`.
#[derive(Clone, PartialEq, Message)]
pub struct Unknown {
/// TypeMeta is embedded in line 1
/// apiVersion field from TypeMeta
pub struct UnknownTypeMeta {
#[prost(string, tag = "1")]
pub api_version: String,

/// kind field from TypeMeta
#[prost(string, tag = "2")]
pub kind: String,
}

/// Unknown wraps objects with type metadata for protobuf serialization.
///
/// Mirrors `k8s.io/apimachinery/pkg/runtime.Unknown` exactly:
///
/// ```text
/// message Unknown {
/// optional TypeMeta typeMeta = 1;
/// optional bytes raw = 2;
/// optional string contentEncoding = 3;
/// optional string contentType = 4;
/// }
/// ```
///
/// Before 2026-05-24 this struct flattened TypeMeta in at tags 1-2 and
/// shifted the other fields to 3-5. Encoder and our own decoder agreed,
/// so internal round-trips passed — but any upstream client (kubectl,
/// hydrophone, client-go) decoding the envelope hit
/// `proto: illegal wireType 6` reading the apiVersion bytes as a nested
/// TypeMeta message (run 26360429763, post-#766 Pod CREATE).
#[derive(Clone, PartialEq, Message)]
pub struct Unknown {
#[prost(message, optional, tag = "1")]
pub type_meta: Option<UnknownTypeMeta>,

/// Raw will hold the complete serialized object in protobuf format
#[prost(bytes, tag = "3")]
#[prost(bytes, tag = "2")]
pub raw: Vec<u8>,

/// ContentEncoding is encoding used for the raw data (empty for protobuf)
#[prost(string, tag = "4")]
#[prost(string, tag = "3")]
pub content_encoding: String,

/// ContentType specifies the media type of Raw. If empty, "application/vnd.kubernetes.protobuf"
#[prost(string, tag = "5")]
#[prost(string, tag = "4")]
pub content_type: String,
}

Expand Down Expand Up @@ -83,8 +103,10 @@ pub fn encode_protobuf<T: Serialize>(

// Create Unknown wrapper with type metadata
let unknown = Unknown {
api_version: api_version.to_string(),
kind: kind.to_string(),
type_meta: Some(UnknownTypeMeta {
api_version: api_version.to_string(),
kind: kind.to_string(),
}),
raw: json_bytes,
content_encoding: String::new(),
content_type: "application/json".to_string(), // Indicates raw contains JSON
Expand Down Expand Up @@ -121,11 +143,19 @@ pub fn decode_protobuf<T: for<'de> Deserialize<'de>>(data: &[u8]) -> Result<(T,
let unknown = Unknown::decode(&data[PROTOBUF_MAGIC.len()..])
.map_err(|e| format!("Failed to decode protobuf: {}", e))?;

// Extract type metadata
let type_meta = TypeMeta {
api_version: unknown.api_version.clone(),
kind: unknown.kind.clone(),
};
// Extract type metadata from the nested message (defaults if absent —
// matches upstream Unknown decode which treats typeMeta as optional).
let type_meta = unknown
.type_meta
.as_ref()
.map(|tm| TypeMeta {
api_version: tm.api_version.clone(),
kind: tm.kind.clone(),
})
.unwrap_or_else(|| TypeMeta {
api_version: String::new(),
kind: String::new(),
});

// Deserialize the object from raw bytes
// In our implementation, raw contains JSON
Expand All @@ -149,10 +179,16 @@ pub fn extract_type_meta(data: &[u8]) -> Result<TypeMeta, String> {
let unknown = Unknown::decode(&data[PROTOBUF_MAGIC.len()..])
.map_err(|e| format!("Failed to decode protobuf: {}", e))?;

Ok(TypeMeta {
api_version: unknown.api_version,
kind: unknown.kind,
})
Ok(unknown
.type_meta
.map(|tm| TypeMeta {
api_version: tm.api_version,
kind: tm.kind,
})
.unwrap_or_else(|| TypeMeta {
api_version: String::new(),
kind: String::new(),
}))
}

#[cfg(test)]
Expand Down Expand Up @@ -236,8 +272,10 @@ mod tests {
fn test_unknown_message() {
// Test direct Unknown encoding/decoding
let unknown = Unknown {
api_version: "v1".to_string(),
kind: "Pod".to_string(),
type_meta: Some(UnknownTypeMeta {
api_version: "v1".to_string(),
kind: "Pod".to_string(),
}),
raw: b"{\"test\": \"data\"}".to_vec(),
content_encoding: String::new(),
content_type: "application/json".to_string(),
Expand All @@ -249,4 +287,45 @@ mod tests {
let decoded = Unknown::decode(&buf[..]).expect("Failed to decode");
assert_eq!(decoded, unknown);
}

/// Wire-format compatibility: Unknown.typeMeta must be at field 1 as a
/// nested message (apiVersion=1, kind=2), Unknown.raw at field 2. This
/// is what every upstream client decoder expects — getting it wrong
/// produces `proto: illegal wireType 6` when the client tries to parse
/// the apiVersion string bytes as a nested TypeMeta. See module docs
/// on `Unknown`.
#[test]
fn test_unknown_wire_format_matches_upstream() {
let mut buf = Vec::new();
let unknown = Unknown {
type_meta: Some(UnknownTypeMeta {
api_version: "v1".to_string(),
kind: "Pod".to_string(),
}),
raw: b"{}".to_vec(),
content_encoding: String::new(),
content_type: "application/json".to_string(),
};
unknown.encode(&mut buf).expect("encode");

// First emitted bytes MUST be the field-1 tag for an embedded
// message: tag = (1 << 3) | 2 = 0x0A. (Old broken layout emitted
// 0x0A 0x02 'v' '1' as a *string* — same tag byte, but the inner
// payload is then parsed by clients as a TypeMeta proto message
// whose first byte 'v'=0x76 carries the illegal wireType 6.)
assert_eq!(
buf[0], 0x0A,
"Unknown.typeMeta must be embedded message at field 1; got tag byte 0x{:02x}",
buf[0],
);

// The embedded TypeMeta should contain field 1 (apiVersion="v1")
// then field 2 (kind="Pod"). Decode it back via the nested message
// type to be sure the wire shape round-trips.
let inner_len = buf[1] as usize;
let inner = &buf[2..2 + inner_len];
let typed = UnknownTypeMeta::decode(inner).expect("typeMeta decode");
assert_eq!(typed.api_version, "v1");
assert_eq!(typed.kind, "Pod");
}
}
Loading