Skip to content

feat: Implement Least Request Load Balancing Policy (gRFC A48)#2651

Open
emil10001 wants to merge 3 commits into
grpc:masterfrom
emil10001:master
Open

feat: Implement Least Request Load Balancing Policy (gRFC A48)#2651
emil10001 wants to merge 3 commits into
grpc:masterfrom
emil10001:master

Conversation

@emil10001
Copy link
Copy Markdown
Member

Implements the "all weights equal" Least Request Load Balancing policy in gRPC-Rust, in compliance with gRFC A48. The Least Request policy improves tail latencies in heterogeneous environments by tracking active request counts per endpoint and directing new requests to the backend with the lowest load.

Detailed Changes:

  1. Core Load Balancing Policy (least_request.rs):

    • Defined LeastRequestLoadBalancingConfig to parse and validate the choiceCount config parameter (default = 2, clamped from 2 to 10).
    • Implemented LeastRequestBuilder registering policy name least_request_experimental.
    • Implemented LeastRequestPolicy managing endpoint-level connections via ChildManager children delegating to pick_first.
    • Maintained a persistent mapping of weak subchannel references to active request counters (subchannel_counters) so that outstanding request metrics survive picker updates and name re-resolutions.
    • Implemented LeastRequestPicker utilizing a random sampling selection algorithm over choice_count subchannels.
  2. Active Request Cancellation Safety:

    • Identified and resolved a request counter leak bug where async task cancellations during dyn_invoke.await dropped the Pick closure without calling it.
    • Implemented a custom, defusable ActiveRequestGuard using an AtomicBool inside LeastRequestPicker::pick. The guard guarantees that the active request count is decremented upon drop if the picker's on_complete callback is never invoked.
  3. Channel & Service Config Integration:

    • Registered the builder with the global LB registry in Channel::new inside channel.rs.
    • Added CallbackRecvStream wrapping the stream in the channel's Invoke implementation to trigger on_complete callbacks when client streams are completed or dropped.
    • Added LeastRequest variant to LbPolicyType enum in service_config.rs.
    • Mapped LbPolicyType::LeastRequest configuration inside ResolverChannelController::update in channel.rs.
  4. Test Additions & Verification:

    • Added comprehensive unit tests in least_request.rs covering configuration parsing/clamping/validation, least request selection, tie-breaking, fewer subchannels than choice count, and cancellation drop-guard safety.
    • Modified the InMemoryResolver in inmemory/mod.rs to dynamically set the LeastRequest load-balancing policy based on target URI path prefixes.
    • Wrote a robust E2E integration test test_in_memory_least_request_load_balancing in inmemory/mod.rs verifying dynamic load balancing across multiple in-memory backends concurrently.

Motivation

Solution

Implements the "all weights equal" Least Request Load Balancing policy in
gRPC-Rust, in compliance with gRFC A48. The Least Request policy improves tail
latencies in heterogeneous environments by tracking active request counts per
endpoint and directing new requests to the backend with the lowest load.

Detailed Changes:

1. Core Load Balancing Policy (`least_request.rs`):
   - Defined `LeastRequestLoadBalancingConfig` to parse and validate the
     `choiceCount` config parameter (default = 2, clamped from 2 to 10).
   - Implemented `LeastRequestBuilder` registering policy name
     `least_request_experimental`.
   - Implemented `LeastRequestPolicy` managing endpoint-level connections
     via `ChildManager` children delegating to `pick_first`.
   - Maintained a persistent mapping of weak subchannel references to active
     request counters (`subchannel_counters`) so that outstanding request
     metrics survive picker updates and name re-resolutions.
   - Implemented `LeastRequestPicker` utilizing a random sampling selection
     algorithm over `choice_count` subchannels.

2. Active Request Cancellation Safety:
   - Identified and resolved a request counter leak bug where async task
     cancellations during `dyn_invoke.await` dropped the `Pick` closure
     without calling it.
   - Implemented a custom, defusable `ActiveRequestGuard` using an `AtomicBool`
     inside `LeastRequestPicker::pick`. The guard guarantees that the active
     request count is decremented upon drop if the picker's `on_complete`
     callback is never invoked.

3. Channel & Service Config Integration:
   - Registered the builder with the global LB registry in `Channel::new`
     inside `channel.rs`.
   - Added `CallbackRecvStream` wrapping the stream in the channel's
     `Invoke` implementation to trigger `on_complete` callbacks when client
     streams are completed or dropped.
   - Added `LeastRequest` variant to `LbPolicyType` enum in `service_config.rs`.
   - Mapped `LbPolicyType::LeastRequest` configuration inside
     `ResolverChannelController::update` in `channel.rs`.

4. Test Additions & Verification:
   - Added comprehensive unit tests in `least_request.rs` covering configuration
     parsing/clamping/validation, least request selection, tie-breaking,
     fewer subchannels than choice count, and cancellation drop-guard safety.
   - Modified the `InMemoryResolver` in `inmemory/mod.rs` to dynamically set
     the `LeastRequest` load-balancing policy based on target URI path prefixes.
   - Wrote a robust E2E integration test `test_in_memory_least_request_load_balancing`
     in `inmemory/mod.rs` verifying dynamic load balancing across multiple
     in-memory backends concurrently.
Copy link
Copy Markdown
Member

@dfawley dfawley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I haven't looked at the tests yet, but here's a first round of review comments.

#[default]
PickFirst,
RoundRobin,
LeastRequest,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pre-defined list of LB policies limited to just PF and RR. But I think we can do this for now if we put a TODO to remove. cc @nathanielford for when he implements support for the newer field that lets you specify any policy.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer to drop it now. It can be added back if it makes sense down the road.

Comment thread grpc/src/inmemory/mod.rs
fn build(&self, target: &Target, options: ResolverOptions) -> Box<dyn Resolver> {
let path = target.path().strip_prefix('/').unwrap_or(target.path());
let ids: Vec<String> = path.split(',').map(|s| s.to_string()).collect();
let (lb_policy, rest) = if let Some(stripped) = path.strip_prefix("leastrequest/") {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LB policy should get set by specifying a default service config, not through the in memory resolver. We don't have that yet, though, so this is fine for now since this module is unexported and unused except in tests.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this as a TODO comment.

fn parse_config(
&self,
config: &ParsedJsonLbConfig,
) -> Result<Option<<Self::LbPolicy as LbPolicy>::LbConfig>, String> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can just name the type here if you want to avoid all the colons and "as"es.. Result<LeastRequestLoadBalancingConfig, String>?

Comment on lines +100 to +101
let choice_count = parsed.choice_count.min(10);
Ok(Some(LeastRequestLoadBalancingConfig { choice_count }))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: A style choice. This could also be parsed.choice_count = ...min(10) and then return Ok(Some(parsed)). (parsed would need to be declared as mut)

pub(crate) struct LeastRequestPolicy {
child_manager: ChildManager<Endpoint>,
pick_first_builder: Arc<DynLbPolicyBuilder>,
choice_count: u32,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider storing an Option<Config> here instead of the flat config fields.

Comment on lines +374 to +376
PickResult::Pick(crate::client::load_balancing::Pick {
subchannel: selected.subchannel.clone(),
metadata: crate::metadata::MetadataMap::new(),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use please.

Comment on lines +152 to +154
.state
.picker
.pick(&crate::core::RequestHeaders::default())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be making picks for fake RPCs. If we can't map via the child itself, then we'll need another way to retrieve its active subchannel.

Comment on lines +170 to +172
// Clean up stale counters
self.subchannel_counters
.retain(|weak, _| weak.upgrade().is_some());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be better if it reused the known-active subchannels that we got above instead of keeping all the ones that happen to still upgrade.

}
let aggregate_state = self.child_manager.aggregate_states();

if aggregate_state == ConnectivityState::Ready {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: maybe let picker = if ... and send the update commonly between the if/else.

Comment on lines +184 to +191
let picker = self
.child_manager
.children()
.find(|cs| cs.state.connectivity_state == aggregate_state)
.map(|cs| cs.state.picker.clone())
.unwrap_or_else(|| {
Arc::new(crate::client::load_balancing::QueuingPicker) as Arc<dyn Picker>
});
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What picker is coming out of this? Is it just the first child whose state is the same as the aggregate state? We should round-robin over all of those children that match instead. You should ideally share the existing RoundRobin picker for this.

@emil10001
Copy link
Copy Markdown
Member Author

/gemini review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new "least request" load balancing policy (least_request.rs) to the gRPC client, integrates it into the channel controller and service configuration, and adds comprehensive unit and integration tests. The code review feedback identifies several critical improvement opportunities: a bug where configuration updates (such as choice_count) are not propagated to the active picker because update_picker returns early when child states are unchanged; an issue where parent attributes are discarded when sharding resolver updates for child policies; and a recommendation to sample distinct subchannels (without replacement) to maximize the effectiveness of the "power of two choices" algorithm.

Comment on lines +141 to +144
fn update_picker(&mut self, channel_controller: &mut dyn ChannelController) {
if !self.child_manager.child_updated() {
return;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The update_picker method currently returns early if self.child_manager.child_updated() is false. However, if a resolver update only changes the choice_count configuration without changing any child states or endpoints, child_updated() will be false. This means the new choice_count will not be propagated to the active picker, and the policy will continue using the old choice_count.

To fix this, we should add a force parameter to update_picker and set it to true when called from resolver_update.

Suggested change
fn update_picker(&mut self, channel_controller: &mut dyn ChannelController) {
if !self.child_manager.child_updated() {
return;
}
fn update_picker(&mut self, channel_controller: &mut dyn ChannelController, force: bool) {
if !self.child_manager.child_updated() && !force {
return;
}

let _ = self
.child_manager
.resolver_update(resolver_update, None, channel_controller);
self.update_picker(channel_controller);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Pass false to update_picker to match the new signature and avoid forcing an update when handling resolver errors.

Suggested change
self.update_picker(channel_controller);
self.update_picker(channel_controller, false);

return Err(err.into());
}

self.update_picker(channel_controller);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Pass true to update_picker to force a picker update when a resolver update occurs, ensuring that any changes to choice_count are propagated even if child states are unchanged.

Suggested change
self.update_picker(channel_controller);
self.update_picker(channel_controller, true);

) {
self.child_manager
.subchannel_update(subchannel, state, channel_controller);
self.update_picker(channel_controller);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Pass false to update_picker to match the new signature.

Suggested change
self.update_picker(channel_controller);
self.update_picker(channel_controller, false);


fn work(&mut self, channel_controller: &mut dyn ChannelController) {
self.child_manager.work(channel_controller);
self.update_picker(channel_controller);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Pass false to update_picker to match the new signature.

Suggested change
self.update_picker(channel_controller);
self.update_picker(channel_controller, false);


fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) {
self.child_manager.exit_idle(channel_controller);
self.update_picker(channel_controller);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Pass false to update_picker to match the new signature.

Suggested change
self.update_picker(channel_controller);
self.update_picker(channel_controller, false);

Comment on lines +245 to +250
let update = ResolverUpdate {
attributes: crate::attributes::Attributes::default(),
endpoints: Ok(vec![e.clone()]),
service_config: update.service_config.clone(),
resolution_note: None,
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When sharding the resolver update for child policies, the parent's update.attributes are discarded and replaced with Attributes::default(). This can cause important channel-level or security attributes resolved by the name resolver to be lost for the child policies.

We should propagate update.attributes.clone() to the child updates instead.

Suggested change
let update = ResolverUpdate {
attributes: crate::attributes::Attributes::default(),
endpoints: Ok(vec![e.clone()]),
service_config: update.service_config.clone(),
resolution_note: None,
};
let update = ResolverUpdate {
attributes: update.attributes.clone(),
endpoints: Ok(vec![e.clone()]),
service_config: update.service_config.clone(),
resolution_note: None,
};

Comment on lines +321 to +338
let sample_limit = self.choice_count.min(len);
let mut best_idx: Option<usize> = None;
let mut best_active_requests = usize::MAX;

for _ in 0..sample_limit {
let idx = if len == 1 {
0
} else {
rand::random_range(0..len)
};
let active_reqs = self.subchannels[idx]
.active_requests
.load(Ordering::Relaxed);
if best_idx.is_none() || active_reqs < best_active_requests {
best_idx = Some(idx);
best_active_requests = active_reqs;
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current random sampling algorithm uses sampling with replacement (rand::random_range(0..len) in a loop). When the number of subchannels is small (e.g., 2 or 3), there is a high probability of selecting the same subchannel multiple times, which reduces the effectiveness of the "power of two choices" algorithm.

We can improve this by tracking selected indices and ensuring we only sample distinct subchannels up to sample_limit.

        let sample_limit = self.choice_count.min(len);
        let mut best_idx: Option<usize> = None;
        let mut best_active_requests = usize::MAX;

        let mut selected_indices = Vec::with_capacity(sample_limit);
        while selected_indices.len() < sample_limit {
            let idx = if len == 1 {
                0
            } else {
                rand::random_range(0..len)
            };
            if !selected_indices.contains(&idx) {
                selected_indices.push(idx);
                let active_reqs = self.subchannels[idx]
                    .active_requests
                    .load(Ordering::Relaxed);
                if best_idx.is_none() || active_reqs < best_active_requests {
                    best_idx = Some(idx);
                    best_active_requests = active_reqs;
                }
            }
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants