Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/core-bridge/sdk-core
Submodule sdk-core updated 70 files
+2 −2 .github/workflows/per-pr.yml
+9 −0 crates/client/src/raw.rs
+566 −39 crates/client/src/worker_registry/mod.rs
+7 −0 crates/common/src/errors.rs
+2 −2 crates/common/src/lib.rs
+84 −4 crates/common/src/worker.rs
+129 −0 crates/common/tests/worker_task_types_test.rs
+1 −0 crates/sdk-core-c-bridge/Cargo.toml
+8 −1 crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h
+23 −2 crates/sdk-core-c-bridge/src/worker.rs
+2 −0 crates/sdk-core/Cargo.toml
+17 −4 crates/sdk-core/src/abstractions.rs
+60 −0 crates/sdk-core/src/antithesis.rs
+2 −2 crates/sdk-core/src/core_tests/activity_tasks.rs
+226 −4 crates/sdk-core/src/core_tests/workers.rs
+5 −5 crates/sdk-core/src/core_tests/workflow_tasks.rs
+20 −2 crates/sdk-core/src/lib.rs
+2 −2 crates/sdk-core/src/replay/mod.rs
+18 −0 crates/sdk-core/src/retry_logic.rs
+103 −15 crates/sdk-core/src/test_help/integ_helpers.rs
+2 −2 crates/sdk-core/src/worker/heartbeat.rs
+293 −188 crates/sdk-core/src/worker/mod.rs
+142 −32 crates/sdk-core/src/worker/workflow/machines/activity_state_machine.rs
+5 −5 crates/sdk-core/src/worker/workflow/machines/cancel_external_state_machine.rs
+3 −3 crates/sdk-core/src/worker/workflow/machines/cancel_nexus_op_state_machine.rs
+3 −3 crates/sdk-core/src/worker/workflow/machines/cancel_workflow_state_machine.rs
+28 −33 crates/sdk-core/src/worker/workflow/machines/child_workflow_state_machine.rs
+10 −4 crates/sdk-core/src/worker/workflow/machines/complete_workflow_state_machine.rs
+3 −3 crates/sdk-core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs
+3 −3 crates/sdk-core/src/worker/workflow/machines/fail_workflow_state_machine.rs
+30 −26 crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs
+13 −12 crates/sdk-core/src/worker/workflow/machines/mod.rs
+18 −20 crates/sdk-core/src/worker/workflow/machines/nexus_operation_state_machine.rs
+9 −7 crates/sdk-core/src/worker/workflow/machines/patch_state_machine.rs
+6 −8 crates/sdk-core/src/worker/workflow/machines/signal_external_state_machine.rs
+23 −9 crates/sdk-core/src/worker/workflow/machines/timer_state_machine.rs
+14 −10 crates/sdk-core/src/worker/workflow/machines/update_state_machine.rs
+43 −54 crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs
+8 −12 crates/sdk-core/src/worker/workflow/machines/workflow_task_state_machine.rs
+32 −19 crates/sdk-core/src/worker/workflow/managed_run.rs
+96 −16 crates/sdk-core/src/worker/workflow/mod.rs
+4 −3 crates/sdk-core/src/worker/workflow/run_cache.rs
+2 −2 crates/sdk-core/src/worker/workflow/workflow_stream.rs
+5 −4 crates/sdk-core/tests/common/mod.rs
+3 −1 crates/sdk-core/tests/heavy_tests.rs
+20 −5 crates/sdk-core/tests/integ_tests/metrics_tests.rs
+16 −5 crates/sdk-core/tests/integ_tests/update_tests.rs
+14 −14 crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs
+6 −2 crates/sdk-core/tests/integ_tests/worker_tests.rs
+4 −2 crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs
+8 −3 crates/sdk-core/tests/integ_tests/workflow_tests.rs
+4 −1 crates/sdk-core/tests/integ_tests/workflow_tests/cancel_external.rs
+4 −1 crates/sdk-core/tests/integ_tests/workflow_tests/cancel_wf.rs
+13 −4 crates/sdk-core/tests/integ_tests/workflow_tests/child_workflows.rs
+5 −2 crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs
+4 −1 crates/sdk-core/tests/integ_tests/workflow_tests/determinism.rs
+7 −2 crates/sdk-core/tests/integ_tests/workflow_tests/eager.rs
+4 −1 crates/sdk-core/tests/integ_tests/workflow_tests/modify_wf_properties.rs
+31 −5 crates/sdk-core/tests/integ_tests/workflow_tests/nexus.rs
+18 −6 crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs
+11 −2 crates/sdk-core/tests/integ_tests/workflow_tests/resets.rs
+14 −4 crates/sdk-core/tests/integ_tests/workflow_tests/signals.rs
+6 −4 crates/sdk-core/tests/integ_tests/workflow_tests/stickyness.rs
+14 −6 crates/sdk-core/tests/integ_tests/workflow_tests/timers.rs
+4 −1 crates/sdk-core/tests/integ_tests/workflow_tests/upsert_search_attrs.rs
+2 −1 crates/sdk-core/tests/manual_tests.rs
+4 −1 crates/sdk-core/tests/shared_tests/mod.rs
+4 −0 crates/sdk/Cargo.toml
+1 −2 docker-cgroup-tests.sh
+1 −1 etc/integ-with-otel.sh
31 changes: 23 additions & 8 deletions packages/core-bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ pub struct Runtime {
pub fn runtime_new(
bridge_options: config::RuntimeOptions,
) -> BridgeResult<OpaqueOutboundHandle<Runtime>> {
let (telemetry_options, metrics_options, logging_options) = bridge_options.try_into()?;
let (telemetry_options, metrics_options, logging_options, worker_heartbeat_interval_millis) =
bridge_options.try_into()?;

// Create core runtime which starts tokio multi-thread runtime
let runtime_options = RuntimeOptionsBuilder::default()
.telemetry_options(telemetry_options)
.heartbeat_interval(None)
.heartbeat_interval(worker_heartbeat_interval_millis.map(Duration::from_millis))
.build()
.context("Failed to build runtime options")?;
let mut core_runtime = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default())
Expand Down Expand Up @@ -266,6 +267,7 @@ mod config {
log_exporter: LogExporterOptions,
telemetry: TelemetryOptions,
metrics_exporter: Option<MetricsExporterOptions>,
worker_heartbeat_interval_millis: Option<u64>,
}

#[derive(Debug, Clone, TryFromJs)]
Expand Down Expand Up @@ -322,6 +324,7 @@ mod config {
CoreTelemetryOptions,
Option<super::BridgeMetricsExporter>,
super::BridgeLogExporter,
Option<u64>,
)> for RuntimeOptions
{
type Error = BridgeError;
Expand All @@ -331,8 +334,16 @@ mod config {
CoreTelemetryOptions,
Option<super::BridgeMetricsExporter>,
super::BridgeLogExporter,
Option<u64>,
)> {
let (telemetry_logger, log_exporter) = match self.log_exporter {
let Self {
log_exporter,
telemetry,
metrics_exporter,
worker_heartbeat_interval_millis,
} = self;

let (telemetry_logger, log_exporter) = match log_exporter {
LogExporterOptions::Console { filter } => (
CoreTelemetryLogger::Console { filter },
BridgeLogExporter::Console,
Expand All @@ -352,17 +363,21 @@ mod config {
let mut telemetry_options = TelemetryOptionsBuilder::default();
let telemetry_options = telemetry_options
.logging(telemetry_logger)
.metric_prefix(self.telemetry.metric_prefix)
.attach_service_name(self.telemetry.attach_service_name)
.metric_prefix(telemetry.metric_prefix)
.attach_service_name(telemetry.attach_service_name)
.build()
.context("Failed to build telemetry options")?;

let metrics_exporter = self
.metrics_exporter
let metrics_exporter = metrics_exporter
.map(std::convert::TryInto::try_into)
.transpose()?;

Ok((telemetry_options, metrics_exporter, log_exporter))
Ok((
telemetry_options,
metrics_exporter,
log_exporter,
worker_heartbeat_interval_millis,
))
}
}

Expand Down
43 changes: 40 additions & 3 deletions packages/core-bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ pub fn worker_complete_workflow_activation(
),
}
}
CompleteWfError::WorkflowNotEnabled => {
BridgeError::UnexpectedError(err.to_string())
}
})
})
}
Expand Down Expand Up @@ -225,6 +228,9 @@ pub fn worker_complete_activity_task(
field: None,
message: format!("Malformed Activity Completion: {reason:?}"),
},
CompleteActivityError::ActivityNotEnabled => {
BridgeError::UnexpectedError(err.to_string())
}
})
})
}
Expand Down Expand Up @@ -296,7 +302,7 @@ pub fn worker_complete_nexus_task(
.await
.map_err(|err| match err {
CompleteNexusError::NexusNotEnabled => {
BridgeError::UnexpectedError(format!("{err}"))
BridgeError::UnexpectedError(err.to_string())
}
CompleteNexusError::MalformedNexusCompletion { reason } => BridgeError::TypeError {
field: None,
Expand Down Expand Up @@ -466,6 +472,7 @@ mod config {
use std::{sync::Arc, time::Duration};

use temporalio_common::protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior;
use temporalio_common::protos::temporal::api::worker::v1::PluginInfo;
use temporalio_common::worker::{
ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind,
PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder,
Expand Down Expand Up @@ -499,14 +506,15 @@ mod config {
workflow_task_poller_behavior: PollerBehavior,
activity_task_poller_behavior: PollerBehavior,
nexus_task_poller_behavior: PollerBehavior,
enable_non_local_activities: bool,
task_types: WorkerTaskTypes,
sticky_queue_schedule_to_start_timeout: Duration,
max_cached_workflows: usize,
max_heartbeat_throttle_interval: Duration,
default_heartbeat_throttle_interval: Duration,
max_activities_per_second: Option<f64>,
max_task_queue_activities_per_second: Option<f64>,
shutdown_grace_time: Option<Duration>,
plugins: Vec<String>,
Copy link
Member

@cretz cretz Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider deduping plugins if we don't already. Depending on plugin implementation, it may be very normal to configure multiple of the same plugin on the same worker (maybe each has different settings).

This should be done in every SDK IMO too if we don't already (could consider making that Vec a Set on the Core side).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, a set would ensure deduping. Created temporalio/sdk-core#1072

}

#[derive(TryFromJs)]
Expand Down Expand Up @@ -540,6 +548,26 @@ mod config {
AutoUpgrade,
}

#[derive(TryFromJs)]
#[allow(clippy::struct_excessive_bools)]
pub struct WorkerTaskTypes {
enable_workflows: bool,
enable_local_activities: bool,
enable_remote_activities: bool,
enable_nexus: bool,
}

impl From<&WorkerTaskTypes> for temporalio_common::worker::WorkerTaskTypes {
fn from(t: &WorkerTaskTypes) -> Self {
Self {
enable_workflows: t.enable_workflows,
enable_local_activities: t.enable_local_activities,
enable_remote_activities: t.enable_remote_activities,
enable_nexus: t.enable_nexus,
}
}
}

impl BridgeWorkerOptions {
pub(crate) fn into_core_config(self) -> Result<WorkerConfig, WorkerConfigBuilderError> {
// Set all other options
Expand All @@ -566,14 +594,23 @@ mod config {
.workflow_task_poller_behavior(self.workflow_task_poller_behavior)
.activity_task_poller_behavior(self.activity_task_poller_behavior)
.nexus_task_poller_behavior(self.nexus_task_poller_behavior)
.no_remote_activities(!self.enable_non_local_activities)
.task_types(&self.task_types)
.sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout)
.max_cached_workflows(self.max_cached_workflows)
.max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval)
.default_heartbeat_throttle_interval(self.default_heartbeat_throttle_interval)
.max_task_queue_activities_per_second(self.max_task_queue_activities_per_second)
.max_worker_activities_per_second(self.max_activities_per_second)
.graceful_shutdown_period(self.shutdown_grace_time)
.plugins(
self.plugins
.into_iter()
.map(|name| PluginInfo {
name,
version: String::new(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own knowledge, is version from the past or not yet implemented?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I don't know. haha. The API was originally implemented with version, but not sure what the intent was. Will follow up on this.

})
.collect::<Vec<_>>(),
)
.build()
}
}
Expand Down
11 changes: 9 additions & 2 deletions packages/core-bridge/ts/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export type JsonString<_T> = string;
// Runtime
////////////////////////////////////////////////////////////////////////////////////////////////////

export declare function newRuntime(telemOptions: RuntimeOptions): Runtime;
export declare function newRuntime(runtimeOptions: RuntimeOptions): Runtime;

export declare function runtimeShutdown(runtime: Runtime): void;

Expand All @@ -52,6 +52,7 @@ export type RuntimeOptions = {
logExporter: LogExporterOptions;
telemetry: TelemetryOptions;
metricsExporter: MetricExporterOptions;
workerHeartbeatIntervalMillis: Option<number>;
};

export type TelemetryOptions = {
Expand Down Expand Up @@ -213,14 +214,20 @@ export interface WorkerOptions {
workflowTaskPollerBehavior: PollerBehavior;
activityTaskPollerBehavior: PollerBehavior;
nexusTaskPollerBehavior: PollerBehavior;
enableNonLocalActivities: boolean;
taskTypes: {
enableWorkflows: boolean;
enableLocalActivities: boolean;
enableRemoteActivities: boolean;
enableNexus: boolean;
};
stickyQueueScheduleToStartTimeout: number;
maxCachedWorkflows: number;
maxHeartbeatThrottleInterval: number;
defaultHeartbeatThrottleInterval: number;
maxTaskQueueActivitiesPerSecond: Option<number>;
maxActivitiesPerSecond: Option<number>;
shutdownGraceTime: number;
plugins: string[];
}

export type PollerBehavior =
Expand Down
9 changes: 8 additions & 1 deletion packages/test/src/test-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ const GenericConfigs = {
attachServiceName: false,
},
metricsExporter: null,
workerHeartbeatIntervalMillis: null,
} satisfies native.RuntimeOptions,
},
client: {
Expand Down Expand Up @@ -298,14 +299,20 @@ const GenericConfigs = {
initial: 5,
maximum: 100,
},
enableNonLocalActivities: false,
taskTypes: {
enableWorkflows: true,
enableLocalActivities: false,
enableRemoteActivities: false,
enableNexus: false,
},
stickyQueueScheduleToStartTimeout: 1000,
maxCachedWorkflows: 1000,
maxHeartbeatThrottleInterval: 1000,
defaultHeartbeatThrottleInterval: 1000,
maxTaskQueueActivitiesPerSecond: null,
maxActivitiesPerSecond: null,
shutdownGraceTime: 1000,
plugins: [],
} satisfies native.WorkerOptions,
},
ephemeralServer: {
Expand Down
21 changes: 21 additions & 0 deletions packages/test/src/test-local-activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -657,3 +657,24 @@ test.serial('retryPolicy is set correctly', async (t) => {
t.deepEqual(await executeWorkflow(getRetryPolicyFromActivityInfo, { args: [retryPolicy, false] }), retryPolicy);
});
});

export async function runLocalActivityWithNonLocalActivitiesDisabled(): Promise<string> {
const { echo } = workflow.proxyLocalActivities({ startToCloseTimeout: '1m' });
return await echo('hello from local activity');
}

test.serial('Local activities work when enableNonLocalActivities is false', async (t) => {
const { executeWorkflow, createWorker } = helpers(t);
const worker = await createWorker({
activities: {
async echo(message: string): Promise<string> {
return message;
},
},
enableNonLocalActivities: false,
});
await worker.runUntil(async () => {
const result = await executeWorkflow(runLocalActivityWithNonLocalActivitiesDisabled);
t.is(result, 'hello from local activity');
});
});
20 changes: 18 additions & 2 deletions packages/worker/src/runtime-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ export interface RuntimeOptions {
*/
telemetryOptions?: TelemetryOptions;

/**
* Interval for worker heartbeats. `null` disables heartbeating.
*
* @format number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string}
* @default 60000 (60 seconds)
*/
workerHeartbeatInterval?: Duration | null;

/**
* Automatically shutdown workers on any of these signals.
*
Expand Down Expand Up @@ -359,18 +367,25 @@ export interface PrometheusMetricsExporter {
*/
export interface CompiledRuntimeOptions {
shutdownSignals: NodeJS.Signals[];
telemetryOptions: native.RuntimeOptions;
runtimeOptions: native.RuntimeOptions;
logger: Logger;
}

export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions {
const { metrics, noTemporalPrefixForMetrics } = options.telemetryOptions ?? {}; // eslint-disable-line deprecation/deprecation
const [logger, logExporter] = compileLoggerOptions(options);

let workerHeartbeatIntervalMillis: number | null;
if (options.workerHeartbeatInterval === null) {
workerHeartbeatIntervalMillis = null;
} else {
workerHeartbeatIntervalMillis = msToNumber(options.workerHeartbeatInterval ?? '60s');
}

return {
logger,
shutdownSignals: options.shutdownSignals ?? ['SIGINT', 'SIGTERM', 'SIGQUIT', 'SIGUSR2'],
telemetryOptions: {
runtimeOptions: {
logExporter,
telemetry: {
metricPrefix: metrics?.metricPrefix ?? (noTemporalPrefixForMetrics ? '' : 'temporal_'),
Expand Down Expand Up @@ -400,6 +415,7 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions
globalTags: metrics.globalTags ?? {},
} satisfies native.MetricExporterOptions)
: null,
workerHeartbeatIntervalMillis,
},
};
}
Expand Down
4 changes: 2 additions & 2 deletions packages/worker/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class Runtime {
public readonly options: CompiledRuntimeOptions
) {
this.logger = options.logger;
this.metricMeter = options.telemetryOptions.metricsExporter
this.metricMeter = options.runtimeOptions.metricsExporter
? MetricMeterWithComposedTags.compose(new RuntimeMetricMeter(this.native), {}, true)
: noopMetricMeter;

Expand Down Expand Up @@ -97,7 +97,7 @@ export class Runtime {
*/
protected static create(options: RuntimeOptions, instantiator: 'install' | 'instance'): Runtime {
const compiledOptions = compileOptions(options);
const runtime = native.newRuntime(compiledOptions.telemetryOptions);
const runtime = native.newRuntime(compiledOptions.runtimeOptions);

// Remember the provided options in case Core is reinstantiated after being shut down
this.defaultOptions = options;
Expand Down
10 changes: 9 additions & 1 deletion packages/worker/src/worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,8 @@ function nexusServiceRegistryFromOptions(opts: WorkerOptions): nexus.ServiceRegi
}

export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): native.WorkerOptions {
const enableWorkflows = opts.workflowBundle !== undefined || opts.workflowsPath !== undefined;
const enableLocalActivities = enableWorkflows && opts.activities.size > 0;
return {
identity: opts.identity,
buildId: opts.buildId, // eslint-disable-line deprecation/deprecation
Expand All @@ -1090,14 +1092,20 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n
workflowTaskPollerBehavior: toNativeTaskPollerBehavior(opts.workflowTaskPollerBehavior),
activityTaskPollerBehavior: toNativeTaskPollerBehavior(opts.activityTaskPollerBehavior),
nexusTaskPollerBehavior: toNativeTaskPollerBehavior(opts.nexusTaskPollerBehavior),
enableNonLocalActivities: opts.enableNonLocalActivities,
taskTypes: {
enableWorkflows,
enableLocalActivities,
enableRemoteActivities: opts.enableNonLocalActivities && opts.activities.size > 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's basically saying:

enableRemoteActivities: (enableWorkflows && opts.activities.size > 0) && opts.activities.size > 0

Which is incorrect. We don't need workflows to enable remote activities. I'm surprised tests passed with this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's right, you're referring to enableLocalActivities, but I'm using opts.enableNonLocalActivities here

enableNexus: opts.nexusServiceRegistry !== undefined,
},
stickyQueueScheduleToStartTimeout: msToNumber(opts.stickyQueueScheduleToStartTimeout),
maxCachedWorkflows: opts.maxCachedWorkflows,
maxHeartbeatThrottleInterval: msToNumber(opts.maxHeartbeatThrottleInterval),
defaultHeartbeatThrottleInterval: msToNumber(opts.defaultHeartbeatThrottleInterval),
maxTaskQueueActivitiesPerSecond: opts.maxTaskQueueActivitiesPerSecond ?? null,
maxActivitiesPerSecond: opts.maxActivitiesPerSecond ?? null,
shutdownGraceTime: msToNumber(opts.shutdownGraceTime),
plugins: opts.plugins?.map((p) => p.name) ?? [],
};
}

Expand Down
Loading