Contents

Good Practices for Writing Temporal Workflows and Activities

Introduction

Temporal

Temporal is a powerful workflow orchestration platform that helps you build resilient distributed applications. The kicker with Temporal is that you can write applications as you would usually write them, but by wrapping your function calls with specific SDK functions, you get a highly-distributed and fault-tolerant application.

The way I usually explain it to people is that when you usually write and execute an application, it runs locally on a few CPUs and gets orchestrated by the operating system. As a former colleague of mine used to say, Temporal is a distributed operating system. Your CPUs are your workers, and the Temporal server is the orchestrator.

Once you’re used to using Temporal: everything is a workflow, and it makes so much sense. Web endpoint? A workflow. Background job? A workflow. Long-running process? A workflow. Manually-triggered operation? A workflow. Scheduled job? A workflow. Runbooks? A workflow. Deployments? A workflow. Everything is a workflow.

Temporal will take care of the retries, the state management, and the orchestration of your application. It will also help you with the observability of your application by providing a web UI to visualize the execution of your workflows.

Why this post?

Now there are a few rules you need to follow when writing workflows and activities. And some of them are not intuitive at first. To put things in perspective, I wrote a similar version of this document in 2024 while I was working at Temporal, to be used for the internal teams. As I switched role and company, I realized that those same recommendations could be useful for my new colleagues, and as such, that they could be useful at large.

Learning Resources

For newcomers to Temporal, I recommend following the courses to get started before even reading the rest of this post, and more specifically these ones:

And maybe also these ones:

And maybe also these ones:

And maybe also these ones:

Core Principles

Input, Output

Good Practice
Always have exactly one input value and one output value for both workflows and activities.

When writing workflows and activities, it might be tempting to pass multiple parameters or return multiple values. However, this approach can make your code harder to evolve over time. Instead, wrap all your inputs and outputs in a single object.

For example, instead of this:

1
2
3
4
5
6
7
func ProcessOrderWorkflow(ctx workflow.Context,
    orderID string,
    customerID string,
    items []Item,
    shippingAddress Address) (OrderStatus, string, time.Time, error) {
    // ...
}
1
2
3
4
5
6
7
@workflow.defn
def process_order_workflow(order_id: str,
                          customer_id: str,
                          items: list[Item],
                          shipping_address: Address) -> tuple[OrderStatus, str, datetime]:
    # ...
    pass
1
2
3
4
5
6
7
8
export async function processOrderWorkflow(
  orderID: string,
  customerID: string,
  items: Item[],
  shippingAddress: Address
): Promise<[OrderStatus, string, Date]> {
  // ...
}

Do this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type ProcessOrderInput struct {
    OrderID         string
    CustomerID      string
    Items           []Item
    ShippingAddress Address
}

type ProcessOrderOutput struct {
    Status            OrderStatus
    TrackingNumber    string
    EstimatedDelivery time.Time
}

func ProcessOrderWorkflow(ctx workflow.Context, input ProcessOrderInput) (ProcessOrderOutput, error) {
    // ...
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@dataclass
class ProcessOrderInput:
    order_id: str
    customer_id: str
    items: list[Item]
    shipping_address: Address

@dataclass
class ProcessOrderOutput:
    status: OrderStatus
    tracking_number: str
    estimated_delivery: datetime

@workflow.defn
def process_order_workflow(args: dict) -> ProcessOrderOutput:
    # For Python code, I usually rely on a validation function
    # so that input conversion does not raise cryptic, Temporal-internal
    # errors, when the type does not match exactly the input.
    #
    # See below for the implementation of `validate_input`
    inp = validate_input(args, ProcessOrderInput)
    activity.logger.info("Running activity with parameter %s" % inp)
    # ...
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import typing
import datetime

from temporalio.exceptions import ApplicationError


def validate_input(args: dict, expected_type: type) -> typing.Any:
    # This allows to validate the input in a way that do not trigger
    # workflow retries if the input was invalid, as it will probably
    # stay invalid
    try:
        # We need to go over all the args so that if one has an expected
        # type, we can convert it to the expected type using a recursive
        # call to validate_input. We also can use this to convert other
        # complex types that would be represented as another type in JSON
        # (e.g. datetime objects)
        for k, v in args.items():
            if isinstance(v, dict):
                sub_expected_type = expected_type.__annotations__.get(k)
                if sub_expected_type and hasattr(sub_expected_type, "__annotations__"):
                    args[k] = validate_input(v, sub_expected_type)
            elif isinstance(v, str):
                sub_expected_type = expected_type.__annotations__.get(k)
                if sub_expected_type == datetime.datetime:
                    args[k] = datetime.datetime.fromisoformat(v)

        return expected_type(**args)
    except Exception as e:
        raise ApplicationError(f"Invalid input: {e}", non_retryable=True)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
interface ProcessOrderInput {
  orderID: string;
  customerID: string;
  items: Item[];
  shippingAddress: Address;
}

interface ProcessOrderOutput {
  status: OrderStatus;
  trackingNumber: string;
  estimatedDelivery: Date;
}

export async function processOrderWorkflow(
  input: ProcessOrderInput
): Promise<ProcessOrderOutput> {
  // ...
}

This pattern makes it easier to:

  • Add new parameters without breaking existing code
  • Remove deprecated parameters while maintaining backward compatibility
  • Understand what each parameter means (named vs positional)
  • Version your workflows and activities safely

Evolving Temporal Workflows and Activities is a bit like evolving an API: you want to make sure older versions still work until the newer version is fully adopted. This approach thus allows you to add fields to your input/output objects without breaking existing clients (currently running Workflows and Activities), and you can deprecate older fields and remove them once all your clients have upgraded.

Naming Conventions

Good Practice
Use specific, descriptive names for Activities and Workflows that encompass their full function.

When naming Activities and Workflows, it’s tempting to use generic names like Create, Update, or Delete. However, these names can quickly conflict as your codebase grows. Instead, aim for descriptive names that fully capture what the Activity or Workflow does.

For example, prefer names like CreateAwsCluster or UpdateAwsElasticLoadBalancer over generic Create or Update functions. An even better approach is to use package-prefixed namespaces, which creates a natural hierarchy, such as AwsClusterCreate, or even better aws.cluster.Create.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Instead of this approach (generic name, registered as is)
func Create(ctx context.Context, input CreateInput) (CreateOutput, error) {
    // Creates an AWS cluster
}

func RegisterActivities(worker worker.Worker) {
    // Registered with the generic function name "Create"
    worker.RegisterActivity(Create)
}

// Better approach: use namespaced registration name
func CreateCluster(ctx context.Context, input CreateInput) (CreateOutput, error) {
    // Implementation to create an AWS cluster
}

func RegisterActivities(worker worker.Worker) {
    // The function name doesn't matter - the registration name does
    worker.RegisterActivityWithOptions(
        CreateCluster,
        activity.RegisterOptions{Name: "aws.cluster.Create"},
    )
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# Instead of this
@activity.defn
def create(input: CreateInput) -> CreateOutput:
    # Creates an AWS cluster
    pass

# Prefer this
@activity.defn
def create_aws_cluster(input: CreateAwsClusterInput) -> CreateAwsClusterOutput:
    # ...
    pass

# Or even better, use module structure with explicit naming
# aws/cluster.py
@activity.defn(name="aws.cluster.Create")
def create(input: CreateInput) -> CreateOutput:
    # Now registered as aws.cluster.Create
    pass
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Instead of this approach (generic name, registered as is)
export async function create(input: CreateInput): Promise<CreateOutput> {
  // Creates an AWS cluster
}

// When registering:
worker.registerActivities({
  create // Gets registered as "create"
});

// Better approach: use namespaced registration name
export async function createCluster(input: CreateInput): Promise<CreateOutput> {
  // Implementation to create an AWS cluster
}

// When registering:
worker.registerActivities({
  // The function name doesn't matter - the registration name does
  'aws.cluster.Create': createCluster
});

// You can also organize related functions in objects
export const awsActivities = {
  cluster: {
    async create(input: CreateInput): Promise<CreateOutput> {
      // AWS cluster creation implementation
    }
  }
};

// When registering:
worker.registerActivities({
  'aws.cluster.Create': awsActivities.cluster.create
});

For systems that may eventually need to support multiple cloud providers, consider designing your Workflows with cloud-agnosticism in mind from the start. You can create higher-level Workflows that serve as an indirection layer, taking a cloud provider parameter and delegating to the appropriate implementation:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Cloud-specific implementation
func CreateAwsCluster(ctx workflow.Context, input CreateAwsClusterInput) (CreateAwsClusterOutput, error) {
    // AWS-specific implementation
}

func CreateGcpCluster(ctx workflow.Context, input CreateGcpClusterInput) (CreateGcpClusterOutput, error) {
    // GCP-specific implementation
}

// Cloud-agnostic interface
type CloudProvider string
const (
    AWS CloudProvider = "aws"
    GCP CloudProvider = "gcp"
)

type CreateClusterInput struct {
    Provider CloudProvider
    // Common parameters
    Name string
    Size int
    // Provider-specific parameters
    ProviderParams map[string]interface{}
}

// Cloud-agnostic workflow
func CreateCluster(ctx workflow.Context, input CreateClusterInput) (CreateClusterOutput, error) {
    switch input.Provider {
    case AWS:
        awsInput := convertToAwsInput(input)
        return workflow.ExecuteChildWorkflow(ctx, CreateAwsCluster, awsInput).Get(ctx, nil)
    case GCP:
        gcpInput := convertToGcpInput(input)
        return workflow.ExecuteChildWorkflow(ctx, CreateGcpCluster, gcpInput).Get(ctx, nil)
    default:
        return CreateClusterOutput{}, fmt.Errorf("unsupported provider: %s", input.Provider)
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@workflow.defn
def create_aws_cluster(input: CreateAwsClusterInput) -> CreateAwsClusterOutput:
    # AWS-specific implementation
    pass

@workflow.defn
def create_gcp_cluster(input: CreateGcpClusterInput) -> CreateGcpClusterOutput:
    # GCP-specific implementation
    pass

# Cloud provider enum
class CloudProvider(str, Enum):
    AWS = "aws"
    GCP = "gcp"

@dataclass
class CreateClusterInput:
    provider: CloudProvider
    # Common parameters
    name: str
    size: int
    # Provider-specific parameters
    provider_params: dict

@workflow.defn
def create_cluster(input: CreateClusterInput) -> CreateClusterOutput:
    """Cloud-agnostic workflow that delegates to provider-specific workflows"""
    if input.provider == CloudProvider.AWS:
        aws_input = convert_to_aws_input(input)
        return workflow.execute_child_workflow(
            create_aws_cluster, aws_input
        )
    elif input.provider == CloudProvider.GCP:
        gcp_input = convert_to_gcp_input(input)
        return workflow.execute_child_workflow(
            create_gcp_cluster, gcp_input
        )
    else:
        raise ValueError(f"Unsupported provider: {input.provider}")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Cloud-specific implementations
export async function createAwsCluster(input: CreateAwsClusterInput): Promise<CreateAwsClusterOutput> {
  // AWS-specific implementation
}

export async function createGcpCluster(input: CreateGcpClusterInput): Promise<CreateGcpClusterOutput> {
  // GCP-specific implementation
}

// Cloud provider enum
export enum CloudProvider {
  AWS = 'aws',
  GCP = 'gcp',
}

interface CreateClusterInput {
  provider: CloudProvider;
  // Common parameters
  name: string;
  size: number;
  // Provider-specific parameters
  providerParams: Record<string, unknown>;
}

// Cloud-agnostic workflow
export async function createCluster(input: CreateClusterInput): Promise<CreateClusterOutput> {
  switch (input.provider) {
    case CloudProvider.AWS:
      const awsInput = convertToAwsInput(input);
      return await executeChild(createAwsCluster, awsInput);
    case CloudProvider.GCP:
      const gcpInput = convertToGcpInput(input);
      return await executeChild(createGcpCluster, gcpInput);
    default:
      throw new Error(`Unsupported provider: ${input.provider}`);
  }
}

This approach, which we used at Temporal to go multi-cloud, makes it easier to:

  • Add support for new cloud providers without changing existing code
  • Switch providers for specific resources with minimal changes
  • Create workflows that deploy to multiple clouds simultaneously
  • Implement a cloud migration strategy with less effort

By thinking ahead about naming conventions and abstraction layers, you’ll build a more flexible and maintainable system that can adapt to changing requirements over time.

Package Structure and Encapsulation

Good Practice
Keep Activities within their package boundaries and use Child Workflows for cross-package functionality.

When writing code, we use packages – whether in Golang, Python or other languages – to separate logical aspects of our application. When working with Workflows and Activities it’s very easy to lose sight of what the boundaries are. I usually like to consider activities as internal functions and workflows as the interface that gets exposed. This is similar to encapsulation principles for object-oriented programming (OOP).

Activities should be treated as implementation details of a package, not exposed directly to other packages. When you need functionality from another package, call its workflows instead of its activities. If you find yourself needing to share activities across packages, consider moving them to a dedicated common activities package.

This approach maintains clear package boundaries and prevents circular dependencies, making your codebase easier to navigate and maintain over time.

Idempotency

Good Practice
Design both Activities and Workflows to be idempotent.

When writing Temporal Workflows, we’re used to have to think about determinism: Workflows need to take the same path when being replayed or we’ll get a non-deterministic error. However, determinism does not necessarily mean idempotency. With a few exceptions, when writing Workflows and Activities that operate on resources, you will want to aim for idempotency too.

For activities, this means the same input should always produce the same output regardless of how many times it’s executed. This is crucial because Temporal will retry activities when they fail. So, when writing activities that:

  • Create resources: Check if they already exist first
  • Delete resources: Treat “not found” responses as successes (I usually rely on a deleted flag as part of the output to indicate if this succeeded after deleting or because the resource didn’t exist)
  • Call external services: Ensure operations aren’t performed twice

For workflows, aim for end-to-end idempotency. A workflow that creates complex resources should safely handle cases where those resources already exist. While this is challenging, it should be a design goal for all new workflows.

Although compensation logic (cleanup on failure) is useful, it’s insufficient for true idempotency since it won’t handle workflow termination or reset scenarios. Design for idempotency first, with compensation as a secondary safety net.

Operational Considerations

Timeouts, Retries and SLAs

Good Practice
Design Workflows to be invincible and Activities to be resilient.

Workflow Status

Good Practice
Model expected errors as completions with error information, not as workflow failures.

A properly designed Temporal workflow should never fail with valid input. If a workflow fails (rather than completes with an error result), it indicates an issue with the orchestration logic itself.

Think of workflow completion like an exit() call with a return code, while workflow failure is like a system crash. Expected errors should be modeled as completions with error information in the result object.

We could for instance model an output that either has a status flag indicating the Workflow completion status, or a generic Result wrapper type (similar to what Rust does) that would allow you to provide success/error completion data.

Activity Execution Settings

Good Practice
Use generous schedule-to-close timeouts, fair start-to-close timeouts, and avoid retry limits for Activities.

I recommend against setting maximum retry limits for activities. Instead:

  • Set a generous schedule-to-close timeout (weeks or months) – this corresponds to the time between the moment an Activity gets scheduled, and the moment where it has finished executing
  • Configure a reasonable start-to-close timeout based on expected execution time – this corresponds to the time between the start and end of the execution of a given attempt of an Activity
  • Implement monitoring to alert on excessive retries or duration (you can use interceptors interceptors interceptors to conditionally emit metrics to drive your alerts)
  • Use schedule-to-start timeout only when operations might become obsolete after delays (this is rarely the case and if you need to resort to this, you should ask yourself the question twice)

Here’s an example of implementing an activity interceptor that tracks attempts and total duration since scheduling using basic gauges:

Note
This is a simplified example. The use of gauges is not perfect for alerting as it wouldn’t allow to identify all information without creeping cardinality. Ideally, you would be using a logging approach which allows you to alert on specific logs, so that it would be easy to alert and resolve for specific activities in specific workflows.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
type ActivityMetricsInterceptor struct {
    logger        *zap.Logger
    metricsClient MetricsClient
    interceptor.ActivityInboundInterceptorBase
}

func (i *ActivityMetricsInterceptor) ExecuteActivity(ctx context.Context, in *interceptor.ExecuteActivityInput) (interface{}, error) {
    info := activity.GetInfo(ctx)
    activityType := info.ActivityType.Name
    attempt := info.Attempt

    // Prepare tags for metrics
    scheduledTime := info.ScheduledTime
    workflowId := info.WorkflowExecution.ID
    workflowType := info.WorkflowExecution.Type.Name

    metricsTags := map[string]string{
        "activity_type": activityType,
        "workflow_type": workflowType,
        "workflow_id":   workflowId,
    }

    // Emit metrics about the current attempt number
    i.metricsClient.Gauge("activity.attempt.count", float64(attempt), metricsTags)

    // Start a goroutine to emit periodic duration metrics
    durationCtx, durationCancel := context.WithCancel(ctx)
    defer durationCancel()

    go func() {
        ticker := time.NewTicker(5 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-ticker.C:
                elapsedSinceScheduled := time.Since(scheduledTime)
                i.metricsClient.Gauge("activity.total.duration", elapsedSinceScheduled.Seconds(), metricsTags)
            case <-durationCtx.Done():
                return
            }
        }
    }()

    // Execute the activity
    result, err := i.Next.ExecuteActivity(ctx, in)

    // Stop the duration reporting goroutine
    durationCancel()

    // If activity was successful, reset attempt gauge to 0
    if err == nil {
        i.metricsClient.Gauge("activity.attempt.count", 0, metricsTags)
        i.metricsClient.Gauge("activity.total.duration", 0, metricsTags)
    }

    return result, err
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class ActivityMetricsInterceptor(ActivityInboundInterceptor):
    def __init__(self, next: ActivityInboundInterceptor, metrics_client, logger):
        super().__init__(next)
        self.metrics_client = metrics_client
        self.logger = logger

    async def execute_activity(self, input: Any) -> Any:
        info = activity.info()
        activity_type = info.activity_type
        attempt = info.attempt

        # Prepare tags for metrics
        scheduled_time = info.scheduled_time
        workflow_id = info.workflow_execution.workflow_id
        workflow_type = info.workflow_execution.workflow_type

        metrics_tags = {
            "activity_type": activity_type,
            "workflow_id": workflow_id,
            "workflow_type": workflow_type,
        }

        # Track current attempt number as a gauge
        self.metrics_client.gauge("activity.attempt.count", float(attempt), metrics_tags)

        # Flag to control the metrics thread
        stop_metrics = False

        # Start a thread to emit duration metrics every 5 seconds
        def duration_reporting():
            while not stop_metrics:
                try:
                    # Calculate current total duration since scheduling
                    elapsed_since_scheduled = datetime.now() - scheduled_time

                    self.metrics_client.gauge("activity.total.duration",
                        elapsed_since_scheduled.total_seconds(), metrics_tags)

                    time.sleep(5)  # Report every 5 seconds
                except Exception as e:
                    self.logger.error(f"Error reporting metrics: {e}")
                    break

        # Start the metrics reporting thread
        thread = threading.Thread(target=duration_reporting)
        thread.daemon = True
        thread.start()

        try:
            # Execute the activity
            result = await super().execute_activity(input)
            success = True
            return result
        except Exception:
            success = False
            raise
        finally:
            # Stop the metrics reporting
            stop_metrics = True
            thread.join(timeout=1.0)

            # If activity was successful, reset gauges
            if success:
                self.metrics_client.gauge("activity.attempt.count", 0, metrics_tags)
                self.metrics_client.gauge("activity.total.duration", 0, metrics_tags)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ActivityMetricsInterceptor implements ActivityInboundInterceptor {
  constructor(
    private readonly next: ActivityInboundInterceptor,
    private readonly metricsClient: MetricsClient,
    private readonly logger: Console
  ) {}

  async execute(input: ActivityExecuteInput): Promise<unknown> {
    const info = Context.current().info as ActivityInfo;
    const activityType = info.activityType;
    const attempt = info.attempt;

    // Prepare tags for metrics
    const scheduledTime = info.scheduledTime;
    const workflowId = info.workflowExecution.workflowId;
    const workflowType = info.workflowExecution.workflowType;

    const metricsTags = {
      activity_type: activityType,
      workflow_id: workflowId,
      workflow_type: workflowType,
    };

    // Track current attempt number as a gauge
    this.metricsClient.gauge('activity.attempt.count', attempt, metricsTags);

    // Flag to control the interval
    let stopMetrics = false;

    // Set up interval to emit duration metrics every 5 seconds
    const metricsInterval = setInterval(() => {
      if (stopMetrics) {
        clearInterval(metricsInterval);
        return;
      }

      // Calculate current total duration since scheduling
      const elapsedSinceScheduled = Date.now() - scheduledTime.getTime();

      this.metricsClient.gauge('activity.total.duration', elapsedSinceScheduled / 1000, metricsTags);
    }, 5000);

    try {
      // Execute the activity
      const result = await this.next.execute(input);

      return result;
    } finally {
      // Stop the metrics reporting
      stopMetrics = true;
      clearInterval(metricsInterval);

      // If activity was successful, reset gauges
      this.metricsClient.gauge('activity.attempt.count', 0, metricsTags);
      this.metricsClient.gauge('activity.total.duration', 0, metricsTags);
    }
  }
}

This allows you to fix issues in Activities without causing workflow failures while still maintaining visibility into problems. This is a way to use Temporal as it should be used: to make your applications invincible.

Workflow Execution Settings

Good Practice
Use long timeouts for workflows and rely on versioning for fixes.

Similar to activities, avoid short timeouts for workflows. If you must set a WorkflowExecutionTimeout execution_timeout workflowExecutionTimeout , make it very long (weeks or months). Remember that if a Child Workflow times out or fails, the parent typically fails too, undermining Temporal’s recovery capabilities.

When fixing issues in Workflows, maintain determinism using workflow versioning to ensure history replay works correctly. In edge cases, you can also take advantage of the reset option to retry a workflow from an earlier step – but this should be exceptional.

Activity Heartbeating

Basic Heartbeating for Liveness

Good Practice
Use heartbeats for long-running activities.

Temporal uses timeouts to know when a worker has crashed or is stuck on an activity, and to reassign that activity to another worker. For activities that run for extended periods, it would be tempting to use a long timeout, but this would increase the delay before a stuck or crashed worker would be identified, and the activity retried. You should instead opt to set up a reasonable heartbeat timeout (around 30 seconds) when starting the activity, and regularly call activity.RecordHeartbeat() activity.heartbeat() heartbeat() .

If your core activity code can’t call heartbeat directly (e.g. calling out to an API that might take a while to answer, waiting on an I/O operation, etc.), spawn a separate thread to handle it. While this won’t detect infinite loops, it still signals that the worker is running.

Here’s an example of implementing heartbeating in a long-running activity that calls an external API:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func LongRunningActivity(ctx context.Context, input LongRunningInput) (LongRunningOutput, error) {
    logger := activity.GetLogger(ctx)
    logger.Info("Starting long running activity", "input", input)

    // Set up heartbeat details
    heartbeatInterval := 10 * time.Second

    // Create a context for the heartbeat goroutine
    heartbeatCtx, cancel := context.WithCancel(ctx)
    defer cancel()

    // Start a goroutine to send heartbeats
    go func() {
        ticker := time.NewTicker(heartbeatInterval)
        defer ticker.Stop()

        for {
            select {
            case <-ticker.C:
                // Record heartbeat with metadata about the current state
                activity.RecordHeartbeat(ctx, "Waiting for API response")
                logger.Info("Heartbeat recorded")
            case <-heartbeatCtx.Done():
                return
            }
        }
    }()

    // Make the long-running API call
    result, err := callExternalAPIWithLongProcessingTime(ctx, input.RequestData)
    if err != nil {
        return LongRunningOutput{}, err
    }

    return LongRunningOutput{
        Result: result,
        Status: "Completed",
    }, nil
}

// This is the API call that might take a long time to complete
func callExternalAPIWithLongProcessingTime(ctx context.Context, requestData string) (string, error) {
    // In a real implementation, this would make an HTTP request or other external call
    // that could take minutes or even hours to complete

    // Simulate a long-running operation
    select {
    case <-time.After(5 * time.Minute):
        return "API processing completed successfully", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

// Workflow implementation
func LongRunningWorkflow(ctx workflow.Context, input LongRunningInput) (LongRunningOutput, error) {
    options := workflow.ActivityOptions{
        StartToCloseTimeout: 2 * time.Hour,
        HeartbeatTimeout:    30 * time.Second,
    }

    ctx = workflow.WithActivityOptions(ctx, options)

    var result LongRunningOutput
    err := workflow.ExecuteActivity(ctx, LongRunningActivity, input).Get(ctx, &result)
    if err != nil {
        return LongRunningOutput{}, err
    }

    return result, nil
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import asyncio
import threading
import time
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.exceptions import CancelledError

@dataclass
class LongRunningInput:
    request_data: str
    timeout_seconds: int

@dataclass
class LongRunningOutput:
    result: str
    status: str

async def call_external_api_with_long_processing_time(request_data: str) -> str:
    """Simulates a long-running API call that we can't directly modify to add heartbeats."""
    # In a real implementation, this would make an API request that takes a long time
    await asyncio.sleep(300)  # Simulate 5 minutes of processing
    return f"API processed data: {request_data}"

@activity.defn
async def long_running_activity(input: LongRunningInput) -> LongRunningOutput:
    activity.logger.info(f"Starting long running activity with input: {input}")

    # Flag to control the heartbeat thread
    stop_heartbeat = False

    # Create a heartbeat thread that runs separately from the main activity logic
    def heartbeat_thread():
        while not stop_heartbeat:
            try:
                # Send heartbeat
                activity.heartbeat("Waiting for API response")
                activity.logger.info("Heartbeat sent")
                time.sleep(10)  # Heartbeat every 10 seconds
            except Exception as e:
                activity.logger.error(f"Error sending heartbeat: {e}")
                break

    # Start the heartbeat thread
    thread = threading.Thread(target=heartbeat_thread)
    thread.daemon = True
    thread.start()

    try:
        # Make the long-running API call
        try:
            result = await call_external_api_with_long_processing_time(input.request_data)
        except asyncio.CancelledError:
            # Handle cancellation
            raise CancelledError("Activity was cancelled while waiting for API response")

        return LongRunningOutput(
            result=result,
            status="Completed"
        )
    finally:
        # Stop the heartbeat thread
        stop_heartbeat = True
        thread.join(timeout=1.0)

@workflow.defn
class LongRunningWorkflow:
    @workflow.run
    async def run(self, input: LongRunningInput) -> LongRunningOutput:
        # Configure the activity with heartbeat timeout
        return await workflow.execute_activity(
            long_running_activity,
            input,
            start_to_close_timeout=timedelta(hours=2),
            heartbeat_timeout=timedelta(seconds=30)
        )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import { proxyActivities } from '@temporalio/workflow';
import * as wf from '@temporalio/workflow';
import { Activity } from '@temporalio/activity';

interface LongRunningInput {
  requestData: string;
  timeoutSeconds: number;
}

interface LongRunningOutput {
  result: string;
  status: string;
}

// Activity implementation
export async function longRunningActivity(input: LongRunningInput): Promise<LongRunningOutput> {
  console.log(`Starting long running activity with input: ${JSON.stringify(input)}`);

  // Set up heartbeat interval
  const heartbeatInterval = setInterval(() => {
    // Send heartbeat with current state
    Activity.heartbeat('Waiting for API response');
    console.log('Heartbeat sent');
  }, 10000); // Heartbeat every 10 seconds

  try {
    // Call the external API that might take a long time
    const result = await callExternalAPIWithLongProcessingTime(input.requestData);

    return {
      result,
      status: 'Completed'
    };
  } finally {
    // Clean up the heartbeat interval
    clearInterval(heartbeatInterval);
  }
}

// This simulates an external API call that takes a long time to complete
async function callExternalAPIWithLongProcessingTime(requestData: string): Promise<string> {
  // In a real implementation, this would be an API call that takes minutes or hours

  // Simulate a long operation that takes 5 minutes
  return new Promise((resolve, reject) => {
    const timer = setTimeout(() => {
      resolve(`API processed data: ${requestData}`);
    }, 5 * 60 * 1000); // 5 minutes

    // Handle potential cancellation
    const cleanup = () => {
      clearTimeout(timer);
      reject(new Error('API call was cancelled'));
    };

    // Attach cancel handler
    if (Activity.isCancelled) {
      cleanup();
    }
  });
}

// Workflow implementation
export async function longRunningWorkflow(input: LongRunningInput): Promise<LongRunningOutput> {
  const { longRunningActivity } = proxyActivities<{
    longRunningActivity: typeof longRunningActivity;
  }>({
    startToCloseTimeout: '2 hours',
    heartbeatTimeout: '30 seconds',
  });

  return await longRunningActivity(input);
}

State Preservation with Heartbeats

Good Practice
When appropriate, leverage heartbeats to store progress information that allows to resume the activity where it left off in a following attempt.

Heartbeats can also be used to store metadata about the current state of the activity. This is useful for long-running activities that may need to be resumed or retried later. For example, if an activity is processing a large dataset, you can send heartbeats with progress updates, allowing you to resume from the last checkpoint if the activity fails.

While at Temporal, I used this approach for Route53 DNS updates: after triggering the update, you need to wait on the change to propagate fully. I used heartbeats to store the change ID, which then allowed to skip the step of triggering the change (another time!) in case of timeout or crash of the activity while waiting.

Advanced Patterns

Distributed Concurrency Control

Good Practice
Keep workflows focused and implement concurrency control as token-providing workflows.

Entity Workflows are very-long-running Workflows with a well-known Workflow ID that can be addressed using signals. They can be used for instance to represent a shopping cart for an e-commerce website, or to cache API calls, or simply to centralize and serialize operations.

However, similarly to the saying “once you have a hammer everything is a nail”, it’s common to see people use entity workflows to handle arbitrary operations. This leads to opaque Workflow histories that are difficult to reason about, and kills one of the neat features of Temporal in being able to follow and audit the execution of your Workflows.

Instead, you should implement distributed concurrency control and rate limiting as dedicated Workflows that provide tokens upon request. Your operation Workflow would then request a token using Signal-With-Start or Update-With-Start Signal-With-Start or Update-With-Start Signal-With-Start or Update-With-Start , run the operation, and free the token once done. Your concurrency control Workflow can complete when no token has been requested in a given time period.

Here’s an illustration of how this pattern works with ad-hoc workers:

For long-lived workflows, be mindful of history growth. Use workflow.NewContinueAsNewError() workflow.continue_as_new() continueAsNew() to start fresh workflow executions when appropriate, being careful to handle any pending signals correctly. Regularly check workflow.GetInfo(ctx).GetContinueAsNewSuggested() workflowInfo().continueAsNewSuggested workflow.info().is_continue_as_new_suggested() to determine when to continue as new.

Outro

Temporal’s power lies in its ability to make distributed systems more resilient and maintainable, but this power comes with responsibility. By following these good practices, you’ll create workflows that are:

  • Evolvable: Adaptable to changing requirements without breaking existing functionality
  • Maintainable: Clear boundaries and focused responsibilities
  • Resilient: Robust against failures and unexpected conditions
  • Observable: Transparent execution and clear error handling

Remember that Temporal is not just a workflow engine – it’s a distributed operating system for executing your application. Treat it as such, and you’ll build systems that can withstand the challenges of distributed computing while remaining simple to understand and maintain.


Special thanks to Joe and Travis for their feedback on early drafts of this document and contributions to these recommendations. And thanks to Derek for having me see Temporal as the operating system it is.