Code for more reliable systems
Temporal is an open source programming model that can simplify your code, make your applications more reliable, and help you deliver more features faster.
Fail less - Fail better
Just say no to
Temporal Cloud provides Snap a highly reliable and scalable foundation for Snap Stories, enabling us to deliver the amazing global experience our users have come to expect.
Snap Engineering
Code smart
Temporal simplifies your application logic to make your code easier to maintain and your developers more productive. See for yourself with this sample money transfer app that reduced 241 lines of code to 98.
package app
import (
"context"
"fmt"
"time"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
type BankingService interface {
Withdraw(accountNumber string, amount int, referenceID string) (string, error)
Deposit(accountNumber string, amount int, referenceID string) (string, error)
}
type Activities struct {
bank BankingService
}
type PaymentDetails struct {
ReferenceID string
SourceAccount string
TargetAccount string
Amount int
}
func MoneyTransfer(ctx workflow.Context, input PaymentDetails) (string, error) {
// RetryPolicy specifies how to automatically handle retries if an Activity fails.
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 100 * time.Second,
MaximumAttempts: 0, // unlimited retries
NonRetryableErrorTypes: []string{"ErrInvalidAccount", "ErrInsufficientFunds"},
}
options := workflow.ActivityOptions{
// Timeout options specify when to automatically timeout Activity functions.
StartToCloseTimeout: time.Minute,
// Optionally provide a customized RetryPolicy.
// Temporal retries failed Activities by default.
RetryPolicy: retrypolicy,
}
// Apply the options.
ctx = workflow.WithActivityOptions(ctx, options)
// Withdraw money.
var withdrawOutput string
withdrawErr := workflow.ExecuteActivity(ctx, "Withdraw", input).Get(ctx, &withdrawOutput)
if withdrawErr != nil {
return "", withdrawErr
}
// Deposit money.
var depositOutput string
depositErr := workflow.ExecuteActivity(ctx, "Deposit", input).Get(ctx, &depositOutput)
if depositErr != nil {
// The deposit failed; put money back in original account.
var result string
refundErr := workflow.ExecuteActivity(ctx, "Refund", input).Get(ctx, &result)
if refundErr != nil {
return "",
fmt.Errorf("Deposit: failed to deposit money into %v: %v. Money could not be returned to %v: %w",
input.TargetAccount, depositErr, input.SourceAccount, refundErr)
}
return "", fmt.Errorf("Deposit: failed to deposit money into %v: Money returned to %v: %w",
input.TargetAccount, input.SourceAccount, depositErr)
}
result := fmt.Sprintf("Transfer complete (transaction IDs: %s, %s)", withdrawOutput, depositOutput)
return result, nil
}
func (a *Activities) Withdraw(ctx context.Context, data PaymentDetails) (string, error) {
referenceID := fmt.Sprintf("%s-withdrawal", data.ReferenceID)
confirmation, err := a.bank.Withdraw(data.SourceAccount, data.Amount, referenceID)
return confirmation, err
}
func (a *Activities) Deposit(ctx context.Context, data PaymentDetails) (string, error) {
referenceID := fmt.Sprintf("%s-deposit", data.ReferenceID)
confirmation, err := a.bank.Deposit(data.TargetAccount, data.Amount, referenceID)
return confirmation, err
}
func (a *Activities) Refund(ctx context.Context, data PaymentDetails) (string, error) {
referenceID := fmt.Sprintf("%s-refund", data.ReferenceID)
confirmation, err := a.bank.Deposit(data.SourceAccount, data.Amount, referenceID)
return confirmation, err
}
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"math"
"time"
)
type Status uint32
const (
Started Status = iota
Failed
Succeeded
Withdrawing
Depositing
Refunding
)
// ErrStorageConflict is returned by the storage API for the CompareAndSwap operation
var ErrStorageConflict = errors.New("storage conflict")
// ErrInsufficientFunds is returned by the bank API, considered a non-retryable business level error
var ErrInsufficientFunds = errors.New("insufficient funds")
// ErrAccountNotFound is returned by the bank API, considered a non-retryable business level error
var ErrAccountNotFound = errors.New("account not found")
type BankingService interface {
Withdraw(accountNumber string, amount int, referenceID string) (string, error)
Deposit(accountNumber string, amount int, referenceID string) (string, error)
}
type Persistence interface {
Load(ctx context.Context, key string) (state interface{}, err error)
CompareAndSwap(ctx context.Context, key string, state interface{}, expected interface{}) error
}
// Task pulled off a persistent task queue
type Task struct {
QueueName string
Payload []byte
Attempt uint
}
type Consumer func(ctx context.Context, task Task)
// Queue represents a persistent task queue
type Queue interface {
Enqueue(queue string, payload []byte) error
Consume(queue string, consumer Consumer)
Ack(task Task)
RetryLater(task Task, duration time.Duration)
}
type TransactionInput struct {
ReferenceID string
SourceAccountID string
TargetAccountID string
Amount int
// Used to report errors from activities
ErrorMessage string
// Used to verify process consistency
LastStatus Status
}
type ActivityInput struct {
Type string
AccountID string
// Used to extract information about the transaction and forward it back to the transaction handler
Transaction TransactionInput
}
type Worker struct {
queue Queue
persistence Persistence
bank BankingService
}
func (w *Worker) ProcessMoneyTransferEvent(ctx context.Context, task Task) error {
var input TransactionInput
if err := json.Unmarshal(task.Payload, &input); err != nil {
log.Printf("Failed to unmarshal payload: %v", err)
// Enqueue in dead letter queue for human inspection
return moveToDeadLetterQueue(w.queue, task)
}
anyStatus, err := w.persistence.Load(ctx, input.ReferenceID)
if err != nil {
return err
}
status, ok := anyStatus.(Status)
if !ok {
log.Printf("Failed to load status from DB, got: %v", anyStatus)
return nil // discard this task, we don't know what to do with it
}
if status < input.LastStatus {
// CompareAndSwap for the previous ProcessMoneyTransferEvent iteration has not completed before we got an activity completion.
// Return an error and backoff retrying this task until our process is in consistent sate.
return errors.New("got activity completion for uncommited workflow state")
}
if status > input.LastStatus {
log.Printf("Invalid status in task, got: %v, expected: %v", input.LastStatus, status)
return nil // discard this task, we probably generated duplicate activities due to a crash before committing previous status
}
prevStatus := status
switch status {
case Started:
status = Withdrawing
activityInput := ActivityInput{Type: "withdraw", AccountID: input.SourceAccountID, Transaction: input}
if err := scheduleActivity(w.queue, activityInput); err != nil {
return err
}
case Withdrawing:
if input.ErrorMessage != "" {
// Could not withdraw, abort
status = Failed
} else {
status = Depositing
activityInput := ActivityInput{Type: "deposit", AccountID: input.TargetAccountID, Transaction: input}
if err := scheduleActivity(w.queue, activityInput); err != nil {
return err
}
}
case Depositing:
if input.ErrorMessage != "" {
status = Refunding
// Reset the error message
input.ErrorMessage = ""
activityInput := ActivityInput{Type: "refund", AccountID: input.SourceAccountID, Transaction: input}
if err := scheduleActivity(w.queue, activityInput); err != nil {
return err
}
} else {
status = Succeeded
}
case Refunding:
if input.ErrorMessage != "" {
// Critical transaction failure, cannot refund.
// In a real world example a human operator would probably need to examine this transaction.
status = Failed
} else {
status = Succeeded
}
default:
return nil // discard this task, transaction has already completed. This shouldn't happen
}
// Since we already enqueued tasks before storing the state we might generate duplicate tasks in case storage returns an error.
// This is okay for our case because we use a unique transaction ID as an idempotency token.
// Temporal prevents this situation by committing workflow state and the request to schedule an activity in a single transaction.
// This API will fail in case the status was incremented concurrently.
err = w.persistence.CompareAndSwap(ctx, input.ReferenceID, status, prevStatus)
if err != nil && !errors.Is(err, ErrStorageConflict) {
return err
}
return nil
}
func (w *Worker) ProcessActivity(ctx context.Context, task Task) error {
var input ActivityInput
if err := json.Unmarshal(task.Payload, &input); err != nil {
log.Printf("Failed to unmarshal payload: %v", err)
return moveToDeadLetterQueue(w.queue, task)
}
tx := input.Transaction
var operationErr error
switch input.Type {
case "deposit":
_, operationErr = w.bank.Deposit(input.AccountID, tx.Amount, fmt.Sprintf("%s-deposit", tx.ReferenceID))
case "withdraw":
_, operationErr = w.bank.Withdraw(input.AccountID, tx.Amount, fmt.Sprintf("%s-withdraw", tx.ReferenceID))
case "refund":
_, operationErr = w.bank.Deposit(input.AccountID, tx.Amount, fmt.Sprintf("%s-refund", tx.ReferenceID))
default:
operationErr = fmt.Errorf("not implemented")
}
if operationErr != nil {
if errors.Is(operationErr, ErrAccountNotFound) || errors.Is(operationErr, ErrInsufficientFunds) {
// Business error, report back to transaction task
tx.ErrorMessage = operationErr.Error()
} else {
// Transient error, retry later
return operationErr
}
}
payload, err := json.Marshal(tx)
if err != nil {
log.Printf("Failed to marshal payload: %v", err)
// Enqueue in dead letter queue for human inspection
return moveToDeadLetterQueue(w.queue, task)
}
if err := w.queue.Enqueue("transactions", payload); err != nil {
return err
}
return nil
}
// Run is the entry point for our program
func Run(queue Queue, persistence Persistence, bank BankingService) {
worker := Worker{queue, persistence, bank}
handleErrors := func(consumer func(ctx context.Context, task Task) error) Consumer {
return func(ctx context.Context, task Task) {
err := consumer(ctx, task)
if err != nil {
log.Printf("Failed to process task: %v", err)
queue.RetryLater(task, calcBackoff(task))
} else {
queue.Ack(task)
}
}
}
queue.Consume("money-transfer-events", handleErrors(worker.ProcessMoneyTransferEvent))
queue.Consume("money-transfer-activities", handleErrors(worker.ProcessActivity))
}
func scheduleActivity(queue Queue, input ActivityInput) error {
payload, err := json.Marshal(input)
if err != nil {
return err
}
return queue.Enqueue("money-transfer-activities", payload)
}
// calcBackoff calculates exponential backoff without jitter
func calcBackoff(task Task) time.Duration {
initialInterval := float64(time.Millisecond) * 500
return time.Duration(initialInterval * math.Pow(2, float64(task.Attempt)))
}
func moveToDeadLetterQueue(queue Queue, task Task) error {
return queue.Enqueue(fmt.Sprintf("%s-DLQ", task.QueueName), task.Payload)
}
Your app code
Your application code with Temporal
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"math"
"time"
)
type Status uint32
const (
Started Status = iota
Failed
Succeeded
Withdrawing
Depositing
Refunding
)
// ErrStorageConflict is returned by the storage API for the CompareAndSwap operation
var ErrStorageConflict = errors.New("storage conflict")
// ErrInsufficientFunds is returned by the bank API, considered a non-retryable business level error
var ErrInsufficientFunds = errors.New("insufficient funds")
// ErrAccountNotFound is returned by the bank API, considered a non-retryable business level error
var ErrAccountNotFound = errors.New("account not found")
type BankingService interface {
Withdraw(accountNumber string, amount int, referenceID string) (string, error)
Deposit(accountNumber string, amount int, referenceID string) (string, error)
}
type Persistence interface {
Load(ctx context.Context, key string) (state interface{}, err error)
CompareAndSwap(ctx context.Context, key string, state interface{}, expected interface{}) error
}
// Task pulled off a persistent task queue
type Task struct {
QueueName string
Payload []byte
Attempt uint
}
type Consumer func(ctx context.Context, task Task)
// Queue represents a persistent task queue
type Queue interface {
Enqueue(queue string, payload []byte) error
Consume(queue string, consumer Consumer)
Ack(task Task)
RetryLater(task Task, duration time.Duration)
}
type TransactionInput struct {
ReferenceID string
SourceAccountID string
TargetAccountID string
Amount int
// Used to report errors from activities
ErrorMessage string
// Used to verify process consistency
LastStatus Status
}
type ActivityInput struct {
Type string
AccountID string
// Used to extract information about the transaction and forward it back to the transaction handler
Transaction TransactionInput
}
type Worker struct {
queue Queue
persistence Persistence
bank BankingService
}
func (w *Worker) ProcessMoneyTransferEvent(ctx context.Context, task Task) error {
var input TransactionInput
if err := json.Unmarshal(task.Payload, &input); err != nil {
log.Printf("Failed to unmarshal payload: %v", err)
// Enqueue in dead letter queue for human inspection
return moveToDeadLetterQueue(w.queue, task)
}
anyStatus, err := w.persistence.Load(ctx, input.ReferenceID)
if err != nil {
return err
}
status, ok := anyStatus.(Status)
if !ok {
log.Printf("Failed to load status from DB, got: %v", anyStatus)
return nil // discard this task, we don't know what to do with it
}
if status < input.LastStatus {
// CompareAndSwap for the previous ProcessMoneyTransferEvent iteration has not completed before we got an activity completion.
// Return an error and backoff retrying this task until our process is in consistent sate.
return errors.New("got activity completion for uncommited workflow state")
}
if status > input.LastStatus {
log.Printf("Invalid status in task, got: %v, expected: %v", input.LastStatus, status)
return nil // discard this task, we probably generated duplicate activities due to a crash before committing previous status
}
prevStatus := status
switch status {
case Started:
status = Withdrawing
activityInput := ActivityInput{Type: "withdraw", AccountID: input.SourceAccountID, Transaction: input}
if err := scheduleActivity(w.queue, activityInput); err != nil {
return err
}
case Withdrawing:
if input.ErrorMessage != "" {
// Could not withdraw, abort
status = Failed
} else {
status = Depositing
activityInput := ActivityInput{Type: "deposit", AccountID: input.TargetAccountID, Transaction: input}
if err := scheduleActivity(w.queue, activityInput); err != nil {
return err
}
}
case Depositing:
if input.ErrorMessage != "" {
status = Refunding
// Reset the error message
input.ErrorMessage = ""
activityInput := ActivityInput{Type: "refund", AccountID: input.SourceAccountID, Transaction: input}
if err := scheduleActivity(w.queue, activityInput); err != nil {
return err
}
} else {
status = Succeeded
}
case Refunding:
if input.ErrorMessage != "" {
// Critical transaction failure, cannot refund.
// In a real world example a human operator would probably need to examine this transaction.
status = Failed
} else {
status = Succeeded
}
default:
return nil // discard this task, transaction has already completed. This shouldn't happen
}
// Since we already enqueued tasks before storing the state we might generate duplicate tasks in case storage returns an error.
// This is okay for our case because we use a unique transaction ID as an idempotency token.
// Temporal prevents this situation by committing workflow state and the request to schedule an activity in a single transaction.
// This API will fail in case the status was incremented concurrently.
err = w.persistence.CompareAndSwap(ctx, input.ReferenceID, status, prevStatus)
if err != nil && !errors.Is(err, ErrStorageConflict) {
return err
}
return nil
}
func (w *Worker) ProcessActivity(ctx context.Context, task Task) error {
var input ActivityInput
if err := json.Unmarshal(task.Payload, &input); err != nil {
log.Printf("Failed to unmarshal payload: %v", err)
return moveToDeadLetterQueue(w.queue, task)
}
tx := input.Transaction
var operationErr error
switch input.Type {
case "deposit":
_, operationErr = w.bank.Deposit(input.AccountID, tx.Amount, fmt.Sprintf("%s-deposit", tx.ReferenceID))
case "withdraw":
_, operationErr = w.bank.Withdraw(input.AccountID, tx.Amount, fmt.Sprintf("%s-withdraw", tx.ReferenceID))
case "refund":
_, operationErr = w.bank.Deposit(input.AccountID, tx.Amount, fmt.Sprintf("%s-refund", tx.ReferenceID))
default:
operationErr = fmt.Errorf("not implemented")
}
if operationErr != nil {
if errors.Is(operationErr, ErrAccountNotFound) || errors.Is(operationErr, ErrInsufficientFunds) {
// Business error, report back to transaction task
tx.ErrorMessage = operationErr.Error()
} else {
// Transient error, retry later
return operationErr
}
}
payload, err := json.Marshal(tx)
if err != nil {
log.Printf("Failed to marshal payload: %v", err)
// Enqueue in dead letter queue for human inspection
return moveToDeadLetterQueue(w.queue, task)
}
if err := w.queue.Enqueue("transactions", payload); err != nil {
return err
}
return nil
}
// Run is the entry point for our program
func Run(queue Queue, persistence Persistence, bank BankingService) {
worker := Worker{queue, persistence, bank}
handleErrors := func(consumer func(ctx context.Context, task Task) error) Consumer {
return func(ctx context.Context, task Task) {
err := consumer(ctx, task)
if err != nil {
log.Printf("Failed to process task: %v", err)
queue.RetryLater(task, calcBackoff(task))
} else {
queue.Ack(task)
}
}
}
queue.Consume("money-transfer-events", handleErrors(worker.ProcessMoneyTransferEvent))
queue.Consume("money-transfer-activities", handleErrors(worker.ProcessActivity))
}
func scheduleActivity(queue Queue, input ActivityInput) error {
payload, err := json.Marshal(input)
if err != nil {
return err
}
return queue.Enqueue("money-transfer-activities", payload)
}
// calcBackoff calculates exponential backoff without jitter
func calcBackoff(task Task) time.Duration {
initialInterval := float64(time.Millisecond) * 500
return time.Duration(initialInterval * math.Pow(2, float64(task.Attempt)))
}
func moveToDeadLetterQueue(queue Queue, task Task) error {
return queue.Enqueue(fmt.Sprintf("%s-DLQ", task.QueueName), task.Payload)
}
[Temporal gave us] lines of code reduction in key areas and ubiquitous integration patterns. Our engineers loved it. It brought our people closer together.
Chris Gavin, Architect
∞Gain transparency.
Assert control.
When a customer reports an issue, it's very easy for us to just put the Workflow ID into the Temporal Web UI to see what is going on.
Nicolas Gere, Software Engineer
Temporal works the
way you work
...in your language
...in your cloud
...and at your scale
Temporal executions have run since you’ve loaded this page.
100%open source
MIT license. Developed in the open.
Technologists love Temporal
HashiCorp needed to build long-running, reliable, fault-tolerant tasks for the HashiCorp Cloud Platform.
Temporal's technology satisfied all of these requirements out of the box and allowed our developers to focus on business logic.
Without Temporal's technology, we would've spent a significant amount of time rebuilding Temporal and would've very likely done a worse job.
Mitchell Hashimoto
Co-founder
One of the most interesting pieces of tech I've seen in years… temporal.io does to backend and infra, what React did to frontend… Temporal's engine is quite complex, much like React's, but the surface exposed to the developer is a beautiful "render()" function to organize your backend workflows.
Guillermo Rauch
CEO
Build invincible apps
Give your apps and services durable execution.