From af62f5b39a36a3561d4b29f22663affcaf14728e Mon Sep 17 00:00:00 2001 From: Benji Visser Date: Wed, 27 May 2026 13:47:33 -0400 Subject: [PATCH 1/3] Handle unavailable ShutdownWorker during graceful poll shutdown --- crates/sdk-core/src/core_tests/mod.rs | 120 ++++++++++++++++++++- crates/sdk-core/src/pollers/poll_buffer.rs | 44 ++++---- crates/sdk-core/src/worker/activities.rs | 15 +-- crates/sdk-core/src/worker/mod.rs | 51 +++++++-- 4 files changed, 178 insertions(+), 52 deletions(-) diff --git a/crates/sdk-core/src/core_tests/mod.rs b/crates/sdk-core/src/core_tests/mod.rs index 4b5d14334..ccae40a2f 100644 --- a/crates/sdk-core/src/core_tests/mod.rs +++ b/crates/sdk-core/src/core_tests/mod.rs @@ -13,11 +13,18 @@ use crate::{ }, worker::{ PollerBehavior, - client::mocks::{mock_manual_worker_client, mock_worker_client}, + client::mocks::{ + DEFAULT_WORKERS_REGISTRY, MockManualWorkerClient, mock_manual_worker_client, + mock_worker_client, + }, }, }; use futures_util::FutureExt; -use std::{sync::LazyLock, time::Duration}; +use std::{ + future, + sync::{Arc, LazyLock}, + time::Duration, +}; use temporalio_common::protos::{ TestHistoryBuilder, canned_histories, coresdk::{ @@ -25,10 +32,20 @@ use temporalio_common::protos::{ workflow_completion::WorkflowActivationCompletion, }, temporal::api::{ - enums::v1::EventType, history::v1::WorkflowExecutionOptionsUpdatedEventAttributes, + enums::v1::EventType, + history::v1::WorkflowExecutionOptionsUpdatedEventAttributes, + namespace::v1::{NamespaceInfo, namespace_info::Capabilities}, + workflowservice::v1::{ + DescribeNamespaceResponse, PollActivityTaskQueueResponse, + RecordActivityTaskHeartbeatResponse, + }, }, }; -use tokio::{sync::Barrier, time::sleep}; +use temporalio_common::worker::WorkerTaskTypes; +use tokio::{ + sync::{Barrier, Notify}, + time::sleep, +}; #[tokio::test] async fn after_shutdown_server_is_not_polled() { @@ -111,6 +128,101 @@ async fn shutdown_interrupts_both_polls() { }; } +#[tokio::test] +async fn graceful_activity_poll_shutdown_handles_unimplemented_shutdown_worker() { + let activity_poll_started = Arc::new(Notify::new()); + let activity_poll_started_clone = activity_poll_started.clone(); + let shutdown_worker_called = Arc::new(Notify::new()); + let shutdown_worker_called_clone = shutdown_worker_called.clone(); + + let mut mock_client = MockManualWorkerClient::new(); + mock_client.expect_capabilities().returning(|| None); + mock_client + .expect_workers() + .returning(|| DEFAULT_WORKERS_REGISTRY.clone()); + mock_client.expect_is_mock().returning(|| true); + mock_client + .expect_sdk_name_and_version() + .returning(|| ("test-core".to_string(), "0.0.0".to_string())); + mock_client + .expect_identity() + .returning(|| "test-identity".to_string()); + mock_client + .expect_worker_grouping_key() + .returning(uuid::Uuid::new_v4); + mock_client + .expect_worker_instance_key() + .returning(uuid::Uuid::new_v4); + mock_client + .expect_describe_namespace() + .times(1) + .returning(|| { + async { + Ok(DescribeNamespaceResponse { + namespace_info: Some(NamespaceInfo { + capabilities: Some(Capabilities { + worker_poll_complete_on_shutdown: true, + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }) + } + .boxed() + }); + mock_client + .expect_shutdown_worker() + .times(1) + .returning(move |_, _, _, _| { + let shutdown_worker_called = shutdown_worker_called_clone.clone(); + async move { + shutdown_worker_called.notify_one(); + Err(tonic::Status::unimplemented( + "ShutdownWorker disabled by server", + )) + } + .boxed() + }); + mock_client + .expect_poll_activity_task() + .times(1) + .returning(move |_, _| { + let activity_poll_started = activity_poll_started_clone.clone(); + async move { + activity_poll_started.notify_one(); + future::pending::>().await + } + .boxed() + }); + mock_client + .expect_record_activity_heartbeat() + .returning(|_, _| async { Ok(RecordActivityTaskHeartbeatResponse::default()) }.boxed()); + + let mut cfg = test_worker_cfg() + .activity_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize)) + .build() + .unwrap(); + cfg.task_types = WorkerTaskTypes::activity_only(); + let worker = Worker::new_test(cfg, mock_client); + worker.validate().await.unwrap(); + + let poll_fut = async { worker.poll_activity_task().await }; + let shutdown_fut = async { + activity_poll_started.notified().await; + worker.initiate_shutdown(); + shutdown_worker_called.notified().await; + }; + + let (poll_result, _) = tokio::time::timeout(Duration::from_millis(500), async { + tokio::join!(poll_fut, shutdown_fut) + }) + .await + .expect("activity poll remained pending after shutdown_worker returned UNIMPLEMENTED"); + + assert_matches!(poll_result.unwrap_err(), PollError::ShutDown); +} + #[tokio::test] async fn ignores_workflow_options_updated_event() { let mut t = TestHistoryBuilder::default(); diff --git a/crates/sdk-core/src/pollers/poll_buffer.rs b/crates/sdk-core/src/pollers/poll_buffer.rs index a4d1353de..2ef7ddcbb 100644 --- a/crates/sdk-core/src/pollers/poll_buffer.rs +++ b/crates/sdk-core/src/pollers/poll_buffer.rs @@ -387,7 +387,19 @@ where let capabilities = capabilities.clone(); let poll_task = tokio::spawn(async move { let r = if capabilities.graceful_poll_shutdown() { - pf(timeout_override).await + let mut graceful_poll_shutdown_rx = + capabilities.subscribe_graceful_poll_shutdown(); + let shutdown_for_graceful_fallback = shutdown.clone(); + let local_interrupt_after_graceful_disabled = async move { + shutdown_for_graceful_fallback.cancelled().await; + let _ = graceful_poll_shutdown_rx + .wait_for(|enabled| !*enabled) + .await; + }; + tokio::select! { + r = pf(timeout_override) => r, + _ = local_interrupt_after_graceful_disabled => return, + } } else { let poll_interruptor = shutdown.cancelled().then(|_| async move { if let Some(w) = poll_shutdown_interrupt_wait { @@ -884,10 +896,7 @@ mod tests { wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))), }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities { - graceful_poll_shutdown: AtomicBool::new(false), - poller_autoscaling: AtomicBool::new(false), - }), + Arc::new(NamespaceCapabilities::default()), ); // Poll a bunch of times, "interrupting" it each time, we should only actually have polled @@ -944,10 +953,7 @@ mod tests { wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(1)))), }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities { - graceful_poll_shutdown: AtomicBool::new(false), - poller_autoscaling: AtomicBool::new(false), - }), + Arc::new(NamespaceCapabilities::default()), ); // Should not see error, unwraps should get empty response @@ -1024,10 +1030,7 @@ mod tests { wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))), }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities { - graceful_poll_shutdown: AtomicBool::new(false), - poller_autoscaling: AtomicBool::new(false), - }), + Arc::new(NamespaceCapabilities::default()), ); let first_task = pb.poll().await.expect("Should get first task"); @@ -1133,10 +1136,7 @@ mod tests { wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))), }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities { - graceful_poll_shutdown: AtomicBool::new(false), - poller_autoscaling: AtomicBool::new(false), - }), + Arc::new(NamespaceCapabilities::default()), )); // Trigger the first poll to initialize and get the scaling decision @@ -1217,10 +1217,7 @@ mod tests { wft_poller_shared: None, }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities { - graceful_poll_shutdown: AtomicBool::new(graceful), - poller_autoscaling: AtomicBool::new(false), - }), + Arc::new(NamespaceCapabilities::new(graceful, false)), ); let first = pb.poll().await.unwrap().unwrap(); @@ -1272,10 +1269,7 @@ mod tests { min: minimum, target: AtomicUsize::new(10), ever_saw_scaling_decision: AtomicBool::new(false), - capabilities: Arc::new(NamespaceCapabilities { - graceful_poll_shutdown: AtomicBool::new(false), - poller_autoscaling: AtomicBool::new(supports_autoscaling), - }), + capabilities: Arc::new(NamespaceCapabilities::new(false, supports_autoscaling)), behavior: PollerBehavior::Autoscaling { minimum, maximum: 10, diff --git a/crates/sdk-core/src/worker/activities.rs b/crates/sdk-core/src/worker/activities.rs index e7bdc3d0d..b59b3fd6d 100644 --- a/crates/sdk-core/src/worker/activities.rs +++ b/crates/sdk-core/src/worker/activities.rs @@ -805,10 +805,7 @@ mod tests { max_tps: None, }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities { - graceful_poll_shutdown: AtomicBool::new(false), - poller_autoscaling: AtomicBool::new(false), - }), + Arc::new(NamespaceCapabilities::default()), ); let atm = WorkerActivityTasks::new( sem.clone(), @@ -901,10 +898,7 @@ mod tests { max_tps: None, }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities { - graceful_poll_shutdown: AtomicBool::new(false), - poller_autoscaling: AtomicBool::new(false), - }), + Arc::new(NamespaceCapabilities::default()), ); let atm = WorkerActivityTasks::new( sem.clone(), @@ -979,10 +973,7 @@ mod tests { max_tps: None, }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities { - graceful_poll_shutdown: AtomicBool::new(false), - poller_autoscaling: AtomicBool::new(false), - }), + Arc::new(NamespaceCapabilities::default()), ); let atm = WorkerActivityTasks::new( sem.clone(), diff --git a/crates/sdk-core/src/worker/mod.rs b/crates/sdk-core/src/worker/mod.rs index 0a5ecd5e1..ecd0907e1 100644 --- a/crates/sdk-core/src/worker/mod.rs +++ b/crates/sdk-core/src/worker/mod.rs @@ -434,15 +434,34 @@ pub struct Worker { /// Namespace capabilities discovered via `describe_namespace` during worker validation. pub struct NamespaceCapabilities { - pub(crate) graceful_poll_shutdown: AtomicBool, + pub(crate) graceful_poll_shutdown: watch::Sender, + graceful_poll_shutdown_rx: watch::Receiver, pub(crate) poller_autoscaling: AtomicBool, } impl NamespaceCapabilities { + pub(crate) fn new(graceful_poll_shutdown: bool, poller_autoscaling: bool) -> Self { + let (graceful_poll_shutdown, graceful_poll_shutdown_rx) = + watch::channel(graceful_poll_shutdown); + Self { + graceful_poll_shutdown, + graceful_poll_shutdown_rx, + poller_autoscaling: AtomicBool::new(poller_autoscaling), + } + } + /// Returns true if the server supports graceful poll cancellation on shutdown, so pollers /// can let in-flight polls complete rather than hard-killing them. pub fn graceful_poll_shutdown(&self) -> bool { - self.graceful_poll_shutdown.load(Ordering::Relaxed) + *self.graceful_poll_shutdown_rx.borrow() + } + + pub(crate) fn set_graceful_poll_shutdown(&self, enabled: bool) { + let _ = self.graceful_poll_shutdown.send(enabled); + } + + pub(crate) fn subscribe_graceful_poll_shutdown(&self) -> watch::Receiver { + self.graceful_poll_shutdown.subscribe() } /// Returns true if pollers may scale down on poll timeout even without an explicit scaling @@ -452,6 +471,12 @@ impl NamespaceCapabilities { } } +impl Default for NamespaceCapabilities { + fn default() -> Self { + Self::new(false, false) + } +} + struct AllPermitsTracker { wft_permits: watch::Receiver, act_permits: watch::Receiver, @@ -525,9 +550,7 @@ impl Worker { }); if let Some(caps) = ns_info.and_then(|ns| ns.capabilities) { if caps.worker_poll_complete_on_shutdown { - self.capabilities - .graceful_poll_shutdown - .store(true, Ordering::Relaxed); + self.capabilities.set_graceful_poll_shutdown(true); } if caps.poller_autoscaling { self.capabilities @@ -655,10 +678,7 @@ impl Worker { let wf_sticky_last_suc_poll_time = Arc::new(AtomicCell::new(None)); let act_last_suc_poll_time = Arc::new(AtomicCell::new(None)); let nexus_last_suc_poll_time = Arc::new(AtomicCell::new(None)); - let capabilities = Arc::new(NamespaceCapabilities { - graceful_poll_shutdown: AtomicBool::new(false), - poller_autoscaling: AtomicBool::new(false), - }); + let capabilities = Arc::new(NamespaceCapabilities::default()); let nexus_slots = MeteredPermitDealer::new( tuner.nexus_task_slot_supplier(), @@ -1439,23 +1459,32 @@ impl Worker { .heartbeat_manager .as_ref() .map(|hm| hm.heartbeat_callback.clone()()); + let capabilities = self.capabilities.clone(); let handle = tokio::spawn(async move { match client .shutdown_worker(sticky_name, task_queue, task_queue_types, heartbeat) .await { Err(err) - if !matches!( + if matches!( err.code(), tonic::Code::Unimplemented | tonic::Code::Unavailable ) => { + capabilities.set_graceful_poll_shutdown(false); + debug!( + "shutdown_worker rpc unavailable during worker shutdown; \ + falling back to local poll shutdown: {:?}", + err + ); + } + Err(err) => { warn!( "shutdown_worker rpc errored during worker shutdown: {:?}", err ); } - _ => {} + Ok(_) => {} } }); *guard = Some(handle); From 2f3c82709451baad3c7242e329fda1162dfc72d7 Mon Sep 17 00:00:00 2001 From: Benji Visser Date: Wed, 27 May 2026 13:54:20 -0400 Subject: [PATCH 2/3] Narrow graceful shutdown fallback --- crates/sdk-core/src/pollers/poll_buffer.rs | 38 ++++++++++++++------ crates/sdk-core/src/worker/activities.rs | 15 ++++++-- crates/sdk-core/src/worker/mod.rs | 42 +++++++--------------- 3 files changed, 51 insertions(+), 44 deletions(-) diff --git a/crates/sdk-core/src/pollers/poll_buffer.rs b/crates/sdk-core/src/pollers/poll_buffer.rs index 2ef7ddcbb..fb2ca0129 100644 --- a/crates/sdk-core/src/pollers/poll_buffer.rs +++ b/crates/sdk-core/src/pollers/poll_buffer.rs @@ -387,14 +387,12 @@ where let capabilities = capabilities.clone(); let poll_task = tokio::spawn(async move { let r = if capabilities.graceful_poll_shutdown() { - let mut graceful_poll_shutdown_rx = - capabilities.subscribe_graceful_poll_shutdown(); let shutdown_for_graceful_fallback = shutdown.clone(); let local_interrupt_after_graceful_disabled = async move { shutdown_for_graceful_fallback.cancelled().await; - let _ = graceful_poll_shutdown_rx - .wait_for(|enabled| !*enabled) - .await; + while capabilities.graceful_poll_shutdown() { + tokio::time::sleep(Duration::from_millis(10)).await; + } }; tokio::select! { r = pf(timeout_override) => r, @@ -896,7 +894,10 @@ mod tests { wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))), }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities::default()), + Arc::new(NamespaceCapabilities { + graceful_poll_shutdown: AtomicBool::new(false), + poller_autoscaling: AtomicBool::new(false), + }), ); // Poll a bunch of times, "interrupting" it each time, we should only actually have polled @@ -953,7 +954,10 @@ mod tests { wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(1)))), }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities::default()), + Arc::new(NamespaceCapabilities { + graceful_poll_shutdown: AtomicBool::new(false), + poller_autoscaling: AtomicBool::new(false), + }), ); // Should not see error, unwraps should get empty response @@ -1030,7 +1034,10 @@ mod tests { wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))), }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities::default()), + Arc::new(NamespaceCapabilities { + graceful_poll_shutdown: AtomicBool::new(false), + poller_autoscaling: AtomicBool::new(false), + }), ); let first_task = pb.poll().await.expect("Should get first task"); @@ -1136,7 +1143,10 @@ mod tests { wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))), }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities::default()), + Arc::new(NamespaceCapabilities { + graceful_poll_shutdown: AtomicBool::new(false), + poller_autoscaling: AtomicBool::new(false), + }), )); // Trigger the first poll to initialize and get the scaling decision @@ -1217,7 +1227,10 @@ mod tests { wft_poller_shared: None, }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities::new(graceful, false)), + Arc::new(NamespaceCapabilities { + graceful_poll_shutdown: AtomicBool::new(graceful), + poller_autoscaling: AtomicBool::new(false), + }), ); let first = pb.poll().await.unwrap().unwrap(); @@ -1269,7 +1282,10 @@ mod tests { min: minimum, target: AtomicUsize::new(10), ever_saw_scaling_decision: AtomicBool::new(false), - capabilities: Arc::new(NamespaceCapabilities::new(false, supports_autoscaling)), + capabilities: Arc::new(NamespaceCapabilities { + graceful_poll_shutdown: AtomicBool::new(false), + poller_autoscaling: AtomicBool::new(supports_autoscaling), + }), behavior: PollerBehavior::Autoscaling { minimum, maximum: 10, diff --git a/crates/sdk-core/src/worker/activities.rs b/crates/sdk-core/src/worker/activities.rs index b59b3fd6d..e7bdc3d0d 100644 --- a/crates/sdk-core/src/worker/activities.rs +++ b/crates/sdk-core/src/worker/activities.rs @@ -805,7 +805,10 @@ mod tests { max_tps: None, }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities::default()), + Arc::new(NamespaceCapabilities { + graceful_poll_shutdown: AtomicBool::new(false), + poller_autoscaling: AtomicBool::new(false), + }), ); let atm = WorkerActivityTasks::new( sem.clone(), @@ -898,7 +901,10 @@ mod tests { max_tps: None, }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities::default()), + Arc::new(NamespaceCapabilities { + graceful_poll_shutdown: AtomicBool::new(false), + poller_autoscaling: AtomicBool::new(false), + }), ); let atm = WorkerActivityTasks::new( sem.clone(), @@ -973,7 +979,10 @@ mod tests { max_tps: None, }, Arc::new(AtomicCell::new(None)), - Arc::new(NamespaceCapabilities::default()), + Arc::new(NamespaceCapabilities { + graceful_poll_shutdown: AtomicBool::new(false), + poller_autoscaling: AtomicBool::new(false), + }), ); let atm = WorkerActivityTasks::new( sem.clone(), diff --git a/crates/sdk-core/src/worker/mod.rs b/crates/sdk-core/src/worker/mod.rs index ecd0907e1..27e19a796 100644 --- a/crates/sdk-core/src/worker/mod.rs +++ b/crates/sdk-core/src/worker/mod.rs @@ -434,34 +434,15 @@ pub struct Worker { /// Namespace capabilities discovered via `describe_namespace` during worker validation. pub struct NamespaceCapabilities { - pub(crate) graceful_poll_shutdown: watch::Sender, - graceful_poll_shutdown_rx: watch::Receiver, + pub(crate) graceful_poll_shutdown: AtomicBool, pub(crate) poller_autoscaling: AtomicBool, } impl NamespaceCapabilities { - pub(crate) fn new(graceful_poll_shutdown: bool, poller_autoscaling: bool) -> Self { - let (graceful_poll_shutdown, graceful_poll_shutdown_rx) = - watch::channel(graceful_poll_shutdown); - Self { - graceful_poll_shutdown, - graceful_poll_shutdown_rx, - poller_autoscaling: AtomicBool::new(poller_autoscaling), - } - } - /// Returns true if the server supports graceful poll cancellation on shutdown, so pollers /// can let in-flight polls complete rather than hard-killing them. pub fn graceful_poll_shutdown(&self) -> bool { - *self.graceful_poll_shutdown_rx.borrow() - } - - pub(crate) fn set_graceful_poll_shutdown(&self, enabled: bool) { - let _ = self.graceful_poll_shutdown.send(enabled); - } - - pub(crate) fn subscribe_graceful_poll_shutdown(&self) -> watch::Receiver { - self.graceful_poll_shutdown.subscribe() + self.graceful_poll_shutdown.load(Ordering::Relaxed) } /// Returns true if pollers may scale down on poll timeout even without an explicit scaling @@ -471,12 +452,6 @@ impl NamespaceCapabilities { } } -impl Default for NamespaceCapabilities { - fn default() -> Self { - Self::new(false, false) - } -} - struct AllPermitsTracker { wft_permits: watch::Receiver, act_permits: watch::Receiver, @@ -550,7 +525,9 @@ impl Worker { }); if let Some(caps) = ns_info.and_then(|ns| ns.capabilities) { if caps.worker_poll_complete_on_shutdown { - self.capabilities.set_graceful_poll_shutdown(true); + self.capabilities + .graceful_poll_shutdown + .store(true, Ordering::Relaxed); } if caps.poller_autoscaling { self.capabilities @@ -678,7 +655,10 @@ impl Worker { let wf_sticky_last_suc_poll_time = Arc::new(AtomicCell::new(None)); let act_last_suc_poll_time = Arc::new(AtomicCell::new(None)); let nexus_last_suc_poll_time = Arc::new(AtomicCell::new(None)); - let capabilities = Arc::new(NamespaceCapabilities::default()); + let capabilities = Arc::new(NamespaceCapabilities { + graceful_poll_shutdown: AtomicBool::new(false), + poller_autoscaling: AtomicBool::new(false), + }); let nexus_slots = MeteredPermitDealer::new( tuner.nexus_task_slot_supplier(), @@ -1471,7 +1451,9 @@ impl Worker { tonic::Code::Unimplemented | tonic::Code::Unavailable ) => { - capabilities.set_graceful_poll_shutdown(false); + capabilities + .graceful_poll_shutdown + .store(false, Ordering::Relaxed); debug!( "shutdown_worker rpc unavailable during worker shutdown; \ falling back to local poll shutdown: {:?}", From a8f8b5685cf062dc8a433afdc1ac8f7cfed12f33 Mon Sep 17 00:00:00 2001 From: Benji Visser Date: Wed, 27 May 2026 13:58:33 -0400 Subject: [PATCH 3/3] Clarify graceful shutdown fallback log --- crates/sdk-core/src/worker/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/sdk-core/src/worker/mod.rs b/crates/sdk-core/src/worker/mod.rs index 27e19a796..0860d982b 100644 --- a/crates/sdk-core/src/worker/mod.rs +++ b/crates/sdk-core/src/worker/mod.rs @@ -1456,7 +1456,7 @@ impl Worker { .store(false, Ordering::Relaxed); debug!( "shutdown_worker rpc unavailable during worker shutdown; \ - falling back to local poll shutdown: {:?}", + disabling graceful poll shutdown and interrupting polls locally: {:?}", err ); }