Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 129 additions & 50 deletions cmds/rtk/src/jsonnet/evaluator/jrsonnet/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::{
use jrsonnet_evaluator::{
error::{ErrorKind::*, Result},
manifest::JsonFormat,
val::{ArrValue, NumValue},
IStr, ObjValue, Thunk, Val,
};
use jrsonnet_macros::builtin;
Expand Down Expand Up @@ -122,14 +123,17 @@ pub fn helm_cache_put_json(key: String, json: String) {

/// State of a single `rtkMemoize` cache slot.
///
/// The value is stored as a JSON string (not a `Val`) because `Val` is not
/// `Send`/`Sync` (it uses `Rc` internally), while the cache is shared across
/// worker threads.
/// The value is stored as an [`OwnedVal`] snapshot (not a `Val`) because `Val`
/// is not `Send`/`Sync` (it uses `Rc`/`Cc` internally and is bound to the
/// evaluating thread), while the cache is shared across worker threads. The
/// snapshot is shared via `Arc` so cache hits clone a pointer rather than the
/// whole tree, and a fresh `Val` is rebuilt from it in each consuming thread.
enum MemoState {
/// A worker is currently evaluating the value for this key.
Computing,
/// The value has been computed and is available as JSON.
Done(String),
/// The value has been computed and is available as a thread-portable
/// snapshot.
Done(Arc<OwnedVal>),
/// Computation failed; the next waiter should retry the computation.
Failed,
}
Expand Down Expand Up @@ -191,42 +195,114 @@ impl Drop for MemoComputeGuard<'_> {
}
}

/// Recursively check whether a value contains any hidden object field.
/// Thread-portable, owned snapshot of a [`Val`].
///
/// The memoization cache stores values as JSON, which silently drops hidden
/// (`::`) fields. To avoid surprising data loss we reject such values up front
/// instead of caching a lossy projection. Hidden fields are detected by name
/// (comparing the full field set against the visible one), so a hidden field's
/// value is never evaluated.
fn contains_hidden_field(val: &Val) -> Result<bool> {
match val {
/// `Val` is `Rc`/`Cc`-based and tied to the thread that evaluated it, so it
/// cannot live in the cross-worker memo cache directly. `OwnedVal` is a deep
/// copy made of only owned, `Send + Sync` data, mirroring the jsonnet value
/// model closely enough to round-trip without the lossy JSON projection the
/// cache used previously. In particular, object field visibility (hidden `::`)
/// is preserved, so memoized values keep their hidden fields.
///
/// Functions are intentionally not representable: a `FuncVal` captures a
/// closure `Context` (free variables, `self`/`super`, `std`, imports) plus Rust
/// builtins, none of which can be turned into thread-independent data. A value
/// containing any function (at any depth) is therefore rejected at snapshot
/// time rather than cached lossily.
enum OwnedVal {
Null,
Bool(bool),
Num(f64),
Str(String),
Arr(Vec<OwnedVal>),
/// Object fields in iteration order. The flag marks a hidden (`::`) field.
Obj(Vec<(String, bool, OwnedVal)>),
}

/// Deep-copy a fully-evaluated `Val` into a thread-portable [`OwnedVal`].
///
/// Every array element and object field (including hidden ones) is forced, so
/// the snapshot is a complete, eager copy. Functions cannot be snapshotted (see
/// [`OwnedVal`]); rather than failing on the first one, every function's path
/// (e.g. `spec.template.withName`) is appended to `funcs` so the caller can
/// report all of them at once. `path` is the dotted/indexed location of `val`
/// within the memoized value (empty at the root).
fn val_to_owned(funcs: &mut Vec<String>, path: &str, val: &Val) -> Result<OwnedVal> {
Ok(match val {
Val::Null => OwnedVal::Null,
Val::Bool(b) => OwnedVal::Bool(*b),
Val::Num(n) => OwnedVal::Num(n.get()),
Val::Str(s) => OwnedVal::Str(s.clone().into_flat().to_string()),
Val::Arr(arr) => {
let mut items = Vec::with_capacity(arr.len());
for (idx, item) in arr.iter().enumerate() {
let child = format!("{path}[{idx}]");
items.push(val_to_owned(funcs, &child, &item?)?);
}
OwnedVal::Arr(items)
}
Val::Obj(obj) => {
// A field present with hidden included but absent from the visible
// set is hidden. `:::` (unhide) collapses to a plain visible field,
// which is correct for a flattened snapshot with no further supers.
let visible: HashSet<IStr> = obj.fields_ex(false).into_iter().collect();
let all = obj.fields_ex(true);
// `all` is always a superset of `visible`; a size difference means
// at least one field is hidden.
if all.len() != visible.len() {
return Ok(true);
}
let mut fields = Vec::with_capacity(all.len());
for name in all {
let hidden = !visible.contains(&name);
let field = obj
.get(name)?
.get(name.clone())?
.expect("field listed by fields_ex must exist");
if contains_hidden_field(&field)? {
return Ok(true);
}
let child = if path.is_empty() {
name.to_string()
} else {
format!("{path}.{name}")
};
fields.push((
name.to_string(),
hidden,
val_to_owned(funcs, &child, &field)?,
));
}
Ok(false)
OwnedVal::Obj(fields)
}
Val::Arr(arr) => {
for item in arr.iter() {
if contains_hidden_field(&item?)? {
return Ok(true);
Val::Func(_) => {
funcs.push(if path.is_empty() {
"<root>".to_string()
} else {
path.to_string()
});
OwnedVal::Null
}
})
}

/// Rebuild a fresh `Val` in the current thread from an [`OwnedVal`] snapshot.
///
/// Object fields are recreated with their original visibility via the object
/// builder, so hidden fields stay hidden.
fn owned_to_val(owned: &OwnedVal) -> Val {
match owned {
OwnedVal::Null => Val::Null,
OwnedVal::Bool(b) => Val::Bool(*b),
OwnedVal::Num(n) => {
Val::Num(NumValue::new(*n).expect("snapshot numbers were finite when captured"))
}
OwnedVal::Str(s) => Val::string(s.as_str()),
OwnedVal::Arr(items) => Val::Arr(ArrValue::eager(items.iter().map(owned_to_val).collect())),
OwnedVal::Obj(fields) => {
let mut builder = ObjValue::builder_with_capacity(fields.len());
for (name, hidden, value) in fields {
let value = owned_to_val(value);
let member = builder.field(name.as_str());
if *hidden {
member.hide().value(value);
} else {
member.value(value);
}
}
Ok(false)
Val::Obj(builder.build())
}
_ => Ok(false),
}
}

Expand All @@ -238,12 +314,13 @@ fn contains_hidden_field(val: &Val) -> Result<bool> {
/// a key, other workers requesting the same key block until the result is
/// ready, then reuse it without re-evaluating their own thunk.
///
/// The cached value is stored as JSON, so the returned value is always the
/// JSON-manifested projection of the thunk. Because that projection silently
/// drops hidden (`::`) fields, a value containing any hidden field (at any
/// depth) is rejected with an error rather than cached lossily. The same JSON
/// value is returned to the computing worker too, so every caller observes an
/// identical value regardless of who wins the race to compute it.
/// The cached value is stored as an [`OwnedVal`] snapshot: a thread-portable
/// deep copy that preserves hidden (`::`) object fields, so memoized values
/// keep them instead of being reduced to their visible JSON projection.
/// Functions cannot be snapshotted (they capture a thread-local closure
/// context), so a value containing any function at any depth is rejected. The
/// same snapshot is rebuilt for the computing worker too, so every caller
/// observes an identical value regardless of who wins the race to compute it.
#[builtin]
pub fn rtk_memoize(key: String, value: Thunk<Val>) -> Result<Val> {
// Guard against a thunk that memoizes its own key on the same thread,
Expand Down Expand Up @@ -285,28 +362,32 @@ pub fn rtk_memoize(key: String, value: Thunk<Val>) -> Result<Val> {
completed: false,
};

// Evaluate the (lazy) thunk and fully manifest it to JSON so the
// result can be shared with other threads. The slot lock is NOT
// held during evaluation, so other keys make progress and waiters
// on this key simply block on the condvar.
// Evaluate the (lazy) thunk and deep-copy it into a thread-portable
// snapshot so the result can be shared with other threads. The slot
// lock is NOT held during evaluation, so other keys make progress
// and waiters on this key simply block on the condvar.
let evaluated = value.evaluate()?;
if contains_hidden_field(&evaluated)? {
let mut funcs = Vec::new();
let snapshot = val_to_owned(&mut funcs, "", &evaluated)?;
if !funcs.is_empty() {
funcs.sort_unstable();
funcs.dedup();
return Err(RuntimeError(
format!(
"rtkMemoize: value for key {key:?} contains hidden field(s), \
which cannot be memoized (they would be dropped by JSON serialization)"
"rtkMemoize: value for key {key:?} contains function(s), which cannot be \
memoized (functions capture a thread-local closure context); found at: {}",
funcs.join(", ")
)
.into(),
)
.into());
}
let json = evaluated.manifest(JsonFormat::default())?;
let result: Val = serde_json::from_str(&json)
.map_err(|e| RuntimeError(format!("failed to parse memoized value: {e}").into()))?;
let snapshot = Arc::new(snapshot);
let result = owned_to_val(&snapshot);

{
let mut state = slot.state.lock().unwrap_or_else(|e| e.into_inner());
*state = MemoState::Done(json);
*state = MemoState::Done(snapshot);
slot.cond.notify_all();
}
guard.completed = true;
Expand All @@ -317,10 +398,8 @@ pub fn rtk_memoize(key: String, value: Thunk<Val>) -> Result<Val> {
let mut state = slot.state.lock().unwrap_or_else(|e| e.into_inner());
loop {
match &*state {
MemoState::Done(json) => {
return serde_json::from_str(json).map_err(|e| {
RuntimeError(format!("failed to parse memoized value: {e}").into()).into()
});
MemoState::Done(snapshot) => {
return Ok(owned_to_val(snapshot));
}
// The computing worker failed; retry the whole operation so
// this worker (or another) re-attempts the computation.
Expand Down
Loading
Loading