diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index 147f4b604..7b538a67d 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -42,7 +42,6 @@ tracing = "0.1" url = "2.5" uuid = { version = "1.18", features = ["v4"] } rand = "0.10" -serde_json = { workspace = true } [dependencies.temporalio-common] path = "../common" diff --git a/crates/client/src/workflow_handle.rs b/crates/client/src/workflow_handle.rs index 441ccc30b..4628782a1 100644 --- a/crates/client/src/workflow_handle.rs +++ b/crates/client/src/workflow_handle.rs @@ -12,9 +12,11 @@ use std::{fmt::Debug, marker::PhantomData}; pub use temporalio_common::UntypedWorkflow; use temporalio_common::{ HasWorkflowDefinition, QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition, - data_converters::{RawValue, SerializationContextData}, + data_converters::{DataConverter, PayloadConversionError, RawValue, SerializationContextData}, + payload_visitor::decode_payloads, protos::{ coresdk::FromPayloadsExt, + proto_ts_to_system_time, temporal::api::{ common::v1::{Payload, Payloads, WorkflowExecution as ProtoWorkflowExecution}, enums::v1::{HistoryEventFilterType, UpdateWorkflowExecutionLifecycleStage}, @@ -24,7 +26,9 @@ use temporalio_common::{ v1::{HistoryEvent, history_event::Attributes}, }, query::v1::WorkflowQuery, + sdk::v1::UserMetadata, update::{self, v1::WaitPolicy}, + workflow::v1 as workflow, workflowservice::v1::{ DescribeWorkflowExecutionRequest, DescribeWorkflowExecutionResponse, GetWorkflowExecutionHistoryRequest, PollWorkflowExecutionUpdateRequest, @@ -38,6 +42,32 @@ use temporalio_common::{ use tonic::IntoRequest; use uuid::Uuid; +#[derive(Debug, Clone, Default, PartialEq, Eq)] +struct DecodedUserMetadata { + summary: Option, + details: Option, +} + +async fn decode_user_metadata( + data_converter: &DataConverter, + context: &SerializationContextData, + user_metadata: Option, +) -> Result { + let (summary, details) = user_metadata + .map(|metadata| (metadata.summary, metadata.details)) + .unwrap_or_default(); + Ok(DecodedUserMetadata { + summary: match summary { + Some(payload) => Some(data_converter.from_payload(context, payload).await?), + None => None, + }, + details: match details { + Some(payload) => Some(data_converter.from_payload(context, payload).await?), + None => None, + }, + }) +} + /// Enumerates terminal states for a particular workflow execution #[derive(Debug)] #[allow(clippy::large_enum_variant)] @@ -63,43 +93,182 @@ pub enum WorkflowExecutionResult { } /// Description of a workflow execution returned by `WorkflowHandle::describe`. +/// +/// Access to the underlying Protobuf message is provided by [`raw`](Self::raw). #[derive(Debug, Clone)] pub struct WorkflowExecutionDescription { /// The raw proto response from the server. pub raw_description: DescribeWorkflowExecutionResponse, + history_length: usize, + static_summary: Option, + static_details: Option, } impl WorkflowExecutionDescription { - fn new(raw_description: DescribeWorkflowExecutionResponse) -> Self { - Self { raw_description } + async fn new( + mut raw_description: DescribeWorkflowExecutionResponse, + data_converter: &DataConverter, + ) -> Result { + let raw_user_metadata = raw_description + .execution_config + .as_ref() + .and_then(|cfg| cfg.user_metadata.clone()); + decode_payloads( + &mut raw_description, + data_converter.codec(), + &SerializationContextData::Workflow, + ) + .await; + let decoded_metadata = decode_user_metadata( + data_converter, + &SerializationContextData::Workflow, + raw_user_metadata, + ) + .await?; + let history_length_raw = raw_description + .workflow_execution_info + .as_ref() + .map(|info| info.history_length) + .unwrap_or(0); + let history_length = history_length_raw.try_into().map_err(|_| { + PayloadConversionError::EncodingError( + format!("workflow history_length must be non-negative, got {history_length_raw}") + .into(), + ) + })?; + Ok(Self { + raw_description, + history_length, + static_summary: decoded_metadata.summary, + static_details: decoded_metadata.details, + }) } - /// The static summary set when the workflow was started, if any. - // TOOD: Use DataConverter to avoid direct dependency on serde_json - pub fn static_summary(&self) -> Option { - let payload = self - .raw_description - .execution_config - .as_ref()? - .user_metadata - .as_ref()? - .summary - .as_ref()?; - serde_json::from_slice(&payload.data).ok() - } - - /// The static details set when the workflow was started, if any. - // TOOD: Use DataConverter to avoid direct dependency on serde_json - pub fn static_details(&self) -> Option { - let payload = self - .raw_description - .execution_config - .as_ref()? - .user_metadata - .as_ref()? - .details - .as_ref()?; - serde_json::from_slice(&payload.data).ok() + /// The workflow ID. + pub fn id(&self) -> &str { + self.execution().workflow_id.as_str() + } + + /// The run ID. + pub fn run_id(&self) -> &str { + self.execution().run_id.as_str() + } + + /// The workflow type name. + pub fn workflow_type(&self) -> &str { + self.workflow_type_info().name.as_str() + } + + /// The current status of the workflow execution. + pub fn status( + &self, + ) -> temporalio_common::protos::temporal::api::enums::v1::WorkflowExecutionStatus { + self.workflow_info().status() + } + + /// When the workflow was created. + pub fn start_time(&self) -> Option { + self.workflow_info() + .start_time + .as_ref() + .and_then(proto_ts_to_system_time) + } + + /// When the workflow run started or should start. + pub fn execution_time(&self) -> Option { + self.workflow_info() + .execution_time + .as_ref() + .and_then(proto_ts_to_system_time) + } + + /// When the workflow was closed, if closed. + pub fn close_time(&self) -> Option { + self.workflow_info() + .close_time + .as_ref() + .and_then(proto_ts_to_system_time) + } + + /// The task queue the workflow runs on. + pub fn task_queue(&self) -> &str { + self.workflow_info().task_queue.as_str() + } + + /// Number of events in history. + pub fn history_length(&self) -> usize { + self.history_length + } + + /// Workflow memo after codec decoding. + pub fn memo(&self) -> Option<&temporalio_common::protos::temporal::api::common::v1::Memo> { + self.workflow_info().memo.as_ref() + } + + /// Parent workflow ID, if this is a child workflow. + pub fn parent_id(&self) -> Option<&str> { + self.workflow_info() + .parent_execution + .as_ref() + .map(|e| e.workflow_id.as_str()) + } + + /// Parent run ID, if this is a child workflow. + pub fn parent_run_id(&self) -> Option<&str> { + self.workflow_info() + .parent_execution + .as_ref() + .map(|e| e.run_id.as_str()) + } + + /// Search attributes on the workflow. + pub fn search_attributes( + &self, + ) -> Option<&temporalio_common::protos::temporal::api::common::v1::SearchAttributes> { + self.workflow_info().search_attributes.as_ref() + } + + /// Static summary configured on the workflow, if present. + pub fn static_summary(&self) -> Option<&str> { + self.static_summary.as_deref() + } + + /// Static details configured on the workflow, if present. + pub fn static_details(&self) -> Option<&str> { + self.static_details.as_deref() + } + + /// Access the raw proto for additional fields not exposed via accessors. + pub fn raw(&self) -> &DescribeWorkflowExecutionResponse { + &self.raw_description + } + + /// Consume the wrapper and return the raw proto. + pub fn into_raw(self) -> DescribeWorkflowExecutionResponse { + self.raw_description + } + + fn workflow_info(&self) -> &workflow::WorkflowExecutionInfo { + self.raw_description + .workflow_execution_info + .as_ref() + .expect("describe response missing workflow_execution_info") + } + + fn execution(&self) -> &ProtoWorkflowExecution { + self.workflow_info() + .execution + .as_ref() + .expect("describe response missing workflow_execution_info.execution") + } + + fn workflow_type_info( + &self, + ) -> &temporalio_common::protos::temporal::api::common::v1::WorkflowType { + self.workflow_info() + .r#type + .as_ref() + .expect("describe response missing workflow_execution_info.type") } } @@ -689,7 +858,9 @@ where .await .map_err(WorkflowInteractionError::from_status)? .into_inner(); - Ok(WorkflowExecutionDescription::new(response)) + WorkflowExecutionDescription::new(response, self.client.data_converter()) + .await + .map_err(WorkflowInteractionError::from) } /// Fetch workflow execution history. pub async fn fetch_history( @@ -841,3 +1012,116 @@ where } } } + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + use temporalio_common::protos::temporal::api::{ + common::v1::{Memo, SearchAttributes}, + enums::v1::WorkflowExecutionStatus, + sdk::v1::UserMetadata, + workflow::v1::WorkflowExecutionConfig, + }; + + #[tokio::test] + async fn workflow_description_accessors_expose_decoded_fields() { + let converter = DataConverter::default(); + let memo_payload = converter + .to_payload(&SerializationContextData::Workflow, &"memo-value") + .await + .unwrap(); + let search_attr_payload = converter + .to_payload(&SerializationContextData::Workflow, &"search-value") + .await + .unwrap(); + let summary_payload = converter + .to_payload(&SerializationContextData::Workflow, &"workflow summary") + .await + .unwrap(); + let details_payload = converter + .to_payload(&SerializationContextData::Workflow, &"workflow details") + .await + .unwrap(); + let description = WorkflowExecutionDescription::new( + DescribeWorkflowExecutionResponse { + workflow_execution_info: Some(workflow::WorkflowExecutionInfo { + execution: Some(ProtoWorkflowExecution { + workflow_id: "wf-id".to_string(), + run_id: "run-id".to_string(), + }), + r#type: Some( + temporalio_common::protos::temporal::api::common::v1::WorkflowType { + name: "wf-type".to_string(), + }, + ), + status: WorkflowExecutionStatus::Completed as i32, + task_queue: "task-queue".to_string(), + history_length: 42, + memo: Some(Memo { + fields: HashMap::from([("memo-key".to_string(), memo_payload.clone())]), + }), + parent_execution: Some(ProtoWorkflowExecution { + workflow_id: "parent-id".to_string(), + run_id: "parent-run-id".to_string(), + }), + search_attributes: Some(SearchAttributes { + indexed_fields: HashMap::from([( + "CustomKeywordField".to_string(), + search_attr_payload.clone(), + )]), + }), + ..Default::default() + }), + execution_config: Some(WorkflowExecutionConfig { + user_metadata: Some(UserMetadata { + summary: Some(summary_payload), + details: Some(details_payload), + }), + ..Default::default() + }), + ..Default::default() + }, + &converter, + ) + .await + .unwrap(); + + assert_eq!(description.id(), "wf-id"); + assert_eq!(description.run_id(), "run-id"); + assert_eq!(description.workflow_type(), "wf-type"); + assert_eq!(description.status(), WorkflowExecutionStatus::Completed); + assert_eq!(description.task_queue(), "task-queue"); + assert_eq!(description.history_length(), 42); + assert_eq!(description.parent_id(), Some("parent-id")); + assert_eq!(description.parent_run_id(), Some("parent-run-id")); + assert_eq!(description.memo().unwrap().fields["memo-key"], memo_payload); + assert_eq!( + description.search_attributes().unwrap().indexed_fields["CustomKeywordField"], + search_attr_payload + ); + assert_eq!(description.static_summary(), Some("workflow summary")); + assert_eq!(description.static_details(), Some("workflow details")); + } + + #[tokio::test] + async fn workflow_description_rejects_negative_history_length() { + let err = WorkflowExecutionDescription::new( + DescribeWorkflowExecutionResponse { + workflow_execution_info: Some(workflow::WorkflowExecutionInfo { + history_length: -1, + ..Default::default() + }), + ..Default::default() + }, + &DataConverter::default(), + ) + .await + .unwrap_err(); + + assert_eq!( + err.to_string(), + "Encoding error: workflow history_length must be non-negative, got -1" + ); + } +} diff --git a/crates/sdk-core/tests/integ_tests/data_converter_tests.rs b/crates/sdk-core/tests/integ_tests/data_converter_tests.rs index 5b1a1442c..0f90147a7 100644 --- a/crates/sdk-core/tests/integ_tests/data_converter_tests.rs +++ b/crates/sdk-core/tests/integ_tests/data_converter_tests.rs @@ -7,14 +7,19 @@ use std::{ }, time::Duration, }; -use temporalio_client::{Client, ClientOptions, UntypedWorkflow, WorkflowStartOptions}; +use temporalio_client::{ + Client, ClientOptions, UntypedWorkflow, WorkflowDescribeOptions, WorkflowStartOptions, +}; use temporalio_common::{ data_converters::{ DataConverter, DefaultFailureConverter, MultiArgs2, PayloadCodec, PayloadConversionError, PayloadConverter, SerializationContext, SerializationContextData, TemporalDeserializable, TemporalSerializable, }, - protos::temporal::api::{common::v1::Payload, history::v1::history_event::Attributes}, + protos::{ + coresdk::AsJsonPayloadExt, + temporal::api::{common::v1::Payload, history::v1::history_event::Attributes}, + }, worker::WorkerTaskTypes, }; use temporalio_macros::{activities, workflow, workflow_methods}; @@ -112,6 +117,33 @@ impl DataConverterTestWorkflow { } } +#[workflow] +#[derive(Default)] +struct DescribeDataConverterWorkflow; +#[workflow_methods] +impl DescribeDataConverterWorkflow { + #[run] + async fn run( + ctx: &mut WorkflowContext, + input: TrackedWrapper, + ) -> WorkflowResult { + ctx.upsert_memo([("tracked".to_string(), input.0.data.as_json_payload()?)]); + let output = ctx + .start_activity( + TestActivities::process_tracked, + input, + ActivityOptions { + start_to_close_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }, + ) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; + + Ok(output) + } +} + #[tokio::test] async fn data_converter_tracks_serialization_points() { let wf_name = DataConverterTestWorkflow::name(); @@ -379,3 +411,59 @@ async fn codec_encodes_and_decodes_payloads() { "Codec should have decoded payloads, but decode_count was 0" ); } + +#[tokio::test] +async fn describe_decodes_workflow_payload_fields() { + let wf_name = DescribeDataConverterWorkflow::name(); + let codec = Arc::new(XorCodec::new(0x42)); + + let connection = get_integ_connection(None).await; + let data_converter = DataConverter::new( + PayloadConverter::default(), + DefaultFailureConverter, + codec.clone(), + ); + let client_opts = ClientOptions::new(integ_namespace()) + .data_converter(data_converter) + .build(); + let client = Client::new(connection, client_opts).unwrap(); + + let mut starter = CoreWfStarter::new_with_overrides(wf_name, None, Some(client)); + starter.sdk_config.register_activities(TestActivities); + starter.sdk_config.task_types = WorkerTaskTypes::all(); + starter + .sdk_config + .register_workflow::(); + let wf_id = starter.get_task_queue().to_owned(); + let mut worker = starter.worker().await; + + let handle = worker + .submit_workflow( + DescribeDataConverterWorkflow::run, + TrackedWrapper(TrackedValue::new("codec-describe".to_string())), + WorkflowStartOptions::new(starter.get_task_queue(), wf_id) + .static_summary("codec summary") + .static_details("codec details") + .build(), + ) + .await + .unwrap(); + worker.run_until_done().await.unwrap(); + + let decode_count_before = codec.decode_count(); + let desc = handle + .describe(WorkflowDescribeOptions::default()) + .await + .unwrap(); + + assert!( + codec.decode_count() > decode_count_before, + "Describe should have decoded response payloads" + ); + assert_eq!( + desc.memo().unwrap().fields["tracked"], + "codec-describe".as_json_payload().unwrap() + ); + assert_eq!(desc.static_summary(), Some("codec summary")); + assert_eq!(desc.static_details(), Some("codec details")); +}