Temporal Python 1.0.0 – A Durable, Distributed Asyncio Event Loop
Chad Retz
We are excited about the GA release of the Temporal Python SDK. Python is now a fully-supported workflow language in Temporal, and our use of native asyncio constructs makes it a perfect fit for Python developers looking to write durable workflows.
Here are some links to read more about Temporal Python:
Like essentially all of Temporal, Temporal Python is MIT licensed, and open source contributions are very welcome. Join
us in the #python-sdk
channel in Temporal Slack or ask questions in the
Community Forum.
Most of the details about how to use the SDK are in the aforementioned links, so this post will only give a high-level refresher of what Temporal and Temporal Python are. Then we will dive into some of the details of how Temporal Python leverages asyncio and other Temporal Python features.
Intro to Temporal
Temporal is a workflow system allowing developers to write workflows in code. Workflows run in any number of workers. Code in a workflow translates to events in Temporal which means the workflow code can be replayed with the events on different workers as needed in an Event Sourcing approach. Therefore workflow code must be deterministic in order to safely rerun.
Workflows can call out to activities, which also run on workers and are general purpose functions that can do anything.
Clients can signal, query, cancel, and/or terminate workflows.
Since workflows routinely wait on timers, async tasks, managed coroutines, cancellations, etc., workflows are modeled well as Python asyncio event loops.
Intro to Temporal Python
To give a quick walk through of Temporal Python, we'll implement a simplified form of one-click buying where a purchase is started and then, unless cancelled, will be performed in 10 seconds.
Implementing an Activity
First, let's create a simple activity that does an HTTP POST of a purchase via aiohttp:
import aiohttp
from dataclasses import dataclass, asdict
from temporalio import activity
from temporalio.exceptions import ApplicationError
@dataclass
class Purchase:
item_id: str
user_id: str
@activity.defn
async def do_purchase(purchase: Purchase) -> None:
async with aiohttp.ClientSession() as sess:
async with sess.post("https://api.example.com/purchase", json=asdict(purchase)) as resp:
# We don't want to retry client failure
if resp.status >= 400 and resp.status < 500:
raise ApplicationError(f"Status: {resp.status}", resp.json(), non_retryable=True)
# Otherwise, fail on bad status which will be inherently retried
resp.raise_for_status()
Implementing a Workflow
Now we want to execute that activity from a workflow after 10 seconds unless we receive a cancel:
import asyncio
from datetime import timedelta
from enum import IntEnum
from temporalio import workflow
# Import our activity, but pass it through the sandbox
with workflow.unsafe.imports_passed_through():
from .my_activities import Purchase, do_purchase
class PurchaseStatus(IntEnum):
PENDING = 1
CONFIRMED = 2
CANCELLED = 3
COMPLETED = 4
@workflow.defn
class OneClickBuyWorkflow:
def __init__(self) -> None:
self.status = PurchaseStatus.PENDING
self.purchase: Optional[Purchase] = None
@workflow.run
async def run(self, purchase: Purchase) -> PurchaseStatus:
self.purchase = self.purchase or purchase
# Give user 10 seconds to cancel or update before we send it through
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
self.status = PurchaseStatus.CANCELLED
return self.status
# Update status, purchase, and update status again
self.status = PurchaseStatus.CONFIRMED
await workflow.execute_activity(
Purchaser.purchase,
self.purchase,
start_to_close_timeout=timedelta(minutes=1),
cancellation_type=workflow.ActivityCancellationType.WAIT_CANCELLATION_COMPLETED,
)
self.status = PurchaseStatus.COMPLETED
return self.status
@workflow.signal
def update_purchase(self, purchase: Purchase) -> None:
self.purchase = purchase
@workflow.query
def current_status(self) -> PurchaseStatus:
return self.status
See the asyncio.sleep
in there? That's no normal local-process sleep; that's a durable timer backed by Temporal. See
the "Temporal Workflows are Asyncio Event Loops" section later.
Running a Worker
Workflows and activities are run in workers like so:
from temporalio.client import Client
from temporalio.worker import Worker
from .my_workflows import OneClickBuyWorkflow
from .my_activities import do_purchase
# Create and run a worker on a task queue for the workflow and activity
worker = Worker(
await Client.connect("my.temporal.host:7233"),
task_queue="my-task-queue",
workflows=[OneClickBuyWorkflow],
activities=[do_purchase],
)
await worker.run()
Using a Client
Now we can start the workflow, send it a signal, check it's status, etc.:
from temporalio.client import Client
from .my_activities import Purchase
from .my_workflows import OneClickBuyWorkflow, PurchaseStatus
# Create the client
client = await Client.connect("my.temporal.host:7233")
# Start a workflow
handle = await client.start_workflow(
OneClickBuyWorkflow.run,
Purchase(item_id="item1", user_id="user1"),
id="user1-purchase1",
task_queue="my-task-queue",
)
# We can cancel it if we want
await handle.cancel()
# We can query its status, even if the workflow is complete
status = await handle.query(OneClickBuyWorkflow.current_status)
assert status == PurchaseStatus.CANCELLED
# We can do many other things with the client like sending a signal to update
# the purchase, wait for workflow completion, terminate the workflow, etc.
This only scratches the surface of what can be done with Temporal Python. See the Python SDK project for more details.
Now that we've given a brief overview of Temporal Python, let's discuss how it leverages asyncio.
Temporal Workflows are Asyncio Event Loops
Here we'll describe asyncio and how Temporal leverages it.
Asyncio Behind the Scenes
When running the following in a Python async def
function, what happens?
await asyncio.sleep(10)
It sleeps for 10 seconds. But asyncio.sleep is not
like a time.sleep — it does not block a thread. Instead, it is
more like a JS setTimeout (or a Go time.Sleep
or a
Rust Tokio sleep
, etc.) in that it instructs the underlying scheduler or event loop to yield to other work and only
resume after that time has passed.
In Python as of this writing, here's what sleep(delay, result=None)
looks like:
loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
return await future
finally:
h.cancel()
So it obtains the current event loop, creates a future on it, then tells the event loop via
call_later to invoke the future
result setter after a certain amount of time. How call_later
is implemented is up to the event loop. The default event
loop just puts a timer on a heap of scheduled things to be checked against event loop "time" on each single event loop
iteration.
Asyncio event loops are simply classes that implement
asyncio.AbstractEventLoop. In most
normal cases, developers will use
asyncio.run
(my_async_func())
or similar which
will create the default event loop for that system and
run_until_complete.
run_until_complete
is implemented by default as
run_forever with
stop invoked when the task is done.
So how is run_forever
implemented? Exactly as one might expect for an event loop — with a loop:
while True:
self._run_once()
if self._stopping:
break
This just repeatedly calls _run_once
which just processes all things that are ready to be processed (e.g. futures that
have become ready, timers that have passed their time, etc.) waiting to be woken up by the underlying system to
run an iteration again.
So, an event loop is just a triggered "loop" that executes all ready tasks until all are yielded and then waits to be triggered again.
Temporal's Asyncio Event Loop
In all Temporal SDK languages workflows work similarly. When triggered, they continually run all coroutines individually and cooperatively until they are all yielded waiting on updates from the Temporal server. Then we send the since-collected commands of that most recent iteration to the server. Once the server sends us events back (e.g. a completed timer, a completed activity, a received signal, etc.), we populate yielded values and trigger the workflow to do the same processing again.
So, a Temporal workflow is just a triggered "loop" that executes all ready tasks until all are yielded and then waits to be triggered again. Just like an event loop. Therefore it feels very natural to develop Temporal workflow coroutines as asyncio coroutines.
We have created our own instance of asyncio.AbstractEventLoop
that is set as the currently running loop for Temporal
workflows. We don't even need to implement run_until_complete
or run_forever
, because we always only ever want to
run one iteration of the event loop.
Durability
When we sleep or start a task in normal Python asyncio code and the process crashes, our state and ability to pick up where we left off are lost. Temporal workflows however are built to be durable and resumable. Since workflow code is deterministic and all of these constructs are durable, a worker/process crash is no problem. Temporal is built to replay existing events/code to get to where it left off. Another worker can easily resume the workflow with no data or functionality loss. Once a workflow is accepted by Temporal, it is ensured to run to completion.
Timers
Asyncio timers are built as Temporal timers. For example, inside a workflow if we run:
await asyncio.sleep(10)
We are actually starting a Temporal durable timer. The timer will be visible in the UI. This is the same for everything that ends up calling call_later. So if we have:
await asyncio.wait_for(workflow.execute_child_workflow(MyChildWorkflow.run), timeout=10)
We have started a child workflow and a 10-second timer which behaves just like Python does. That means if the 10-second
timeout occurs before the completion of the child workflow, this will cancel the child workflow and raise a
TimeoutError
.
We can even get the current loop time via:
asyncio.get_running_loop().time()
This effectively the same as workflow.now()
but shows that the event loop is at a deterministic point in time. We can
even use a time-skipping test workflow environment and control the time manually.
Tasks
Creating a task on the event loop is the proper way to start a Temporal coroutine in a workflow. So we can easily have something like:
async with asyncio.TaskGroup() as tg:
for activity_param in range(20):
tg.create_task(
workflow.execute_activity(
my_activity,
activity_param,
start_to_close_timeout=timedelta(minutes=5),
),
)
This uses the new TaskGroup functionality in Python 3.11 to start 20 activities and wait for them all to complete.
In fact Temporal Python was developed before task groups were made available, but since we leverage native asyncio constructs, newer features automatically work.
Activities and child workflow handles are actually implemented as extensions of asyncio tasks. So starting an activity or a started child workflow can be used as tasks. This is acceptable:
handle = workflow.start_activity(my_activity, start_to_close_timeout=timedelta(minutes=5))
handle.set_name("my-task-name-for-stack-traces")
workflow.logger.info("Result: %s", await handle)
Cancellation
Luckily for us, Python cancellation and Temporal cancellation work almost exactly the same. An asyncio cancellation and
a Temporal cancellation are merely requests that raise errors in the underlying code, but can be caught and ignored.
This means if we cancel a workflow from a client, Temporal will relay that to the workflow as
task cancellation which causes an
asyncio.CancelledError
to be raised.
We can even use shielding to ensure something like a really important activity cannot be cancelled, for example:
await asyncio.shield(workflow.execute_activity(
do_not_cancel_me,
start_to_close_timeout=timedelta(minutes=5),
))
Note, this will not stop the outer part from still raising a cancelled error which, if uncaught, will cancel the workflow thereby cancelling the activity anyways. See Python shielding docs on how to ignore cancellation altogether.
This means task cancellation also works. Say we want to be able to cancel activities as a group after 3 seconds:
multiple_activities = asyncio.create_task(asyncio.gather(
workflow.execute_activity(my_activity1, start_to_close_timeout=timedelta(minutes=5)),
workflow.execute_activity(my_activity2, start_to_close_timeout=timedelta(minutes=5)),
))
await asyncio.sleep(3)
multiple_activities.cancel()
await multiple_activities
This actually issues durable cancellation requests to the activities wherever they may be running if they have started.
Synchronization Primitives
We can also use most deterministic asyncio synchronization primitives. For example, it is very common to use asyncio.Queue or asyncio.Event. There are often workflows written like:
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.should_proceed = asyncio.Event()
@workflow.run
async def run(self) -> None:
workflow.logger.info("Waiting...")
await self.should_proceed.wait()
workflow.logger.info("Completing!")
@workflow.signal
def proceed(self) -> None:
self.should_proceed.set()
Then a client may signal the workflow to proceed.
Wait Condition
Since we control each event loop iteration, we can do more advanced things than normal Python code. For example, we
offer a wait_condition
that does not return until a callback is true. We invoke that callback on each iteration. So
the same above signal example could be written as:
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.should_proceed = False
@workflow.run
async def run(self) -> None:
workflow.logger.info("Waiting...")
await workflow.wait_condition(lambda: self.should_proceed)
workflow.logger.info("Completing!")
@workflow.signal
def proceed(self) -> None:
self.should_proceed = True
This type of feature isn't normally available in Python asyncio loops because there is not an easy way to run a callback every iteration of the event loop.
Limitations
Only deterministic asyncio constructs are implemented. So everything relating to subprocesses, disk/network IO, etc. will fail if invoked.
Currently, only relative-time-based functionality is implemented with the Temporal event loop. Therefore using
call_at will fail. Unfortunately the
newer high-level asyncio.timeout is implemented
via call_at
even though it takes a relative time, so it cannot be used yet either. It is possible we may one day
remove this limitation and allow absolute time relative to workflow time, but it can lead to confusing code for a user.
Other Temporal Python Features
While this post focused on how Temporal workflows are durable asyncio event loops, there are some other interesting aspects about Temporal Python worth noting.
Fully Typed
The entire library is fully typed with the latest Python type hinting capabilities. The library was designed with typing in mind and many calls were developed as generics to help developers catch mistakes. For example, given a workflow:
@workflow.defn
class MyWorkflow
@workflow.run
async def run(self, param: int) -> None:
...
MyPy (or other type checkers) would report an error if we did:
await my_client.execute_workflow(MyWorkflow.run, "some param", id="id", task_queue="tq")
This fails because the workflow doesn't take a string argument, it takes an integer one. This type safety is prevalent throughout the library. See the API documentation for all of the types and overloads.
Multiple Activity Styles
We have only shown async def
activities which are the most common and recommended way to develop activities. But many
Python uses require non-async invocations and calling those in an async context will block the event loop.
Developers can use run_in_executor from their async activity if they'd like (and this is a very common approach), but we also support non-async activities. For threaded activities, a worker can be given a ThreadPoolExecutor to run activities. We even support multiprocess activities with a ProcessPoolExecutor. Extra effort was made to support activity heartbeating and cancellation of activities across threads and even processes. See the repository documentation for more info.
Workflow Sandboxing
All workflows run in a sandbox by default. For each workflow run, the sandbox essentially re-imports the file the workflow is in to ensure no global state pollution. Also, it proxies known non-deterministic standard library calls to prevent accidental things like accessing disk or random inside a workflow. Most non-standard-library imports should be marked as pass through at import, e.g.:
with workflow.unsafe.imports_passed_through():
import pydantic
This keeps them from being re-imported which saves memory and performance. The sandbox is not foolproof or secure. There are some known caveats and the sandbox can be disabled per workflow or for the entire worker. See the repository documentation for more details.
There will be a future blog post on how the sandbox works in detail and how we built it.
Rust Core
Both this Python SDK and the TypeScript SDK (and the upcoming Ruby and .NET SDKs) are backed by the same Rust core. Unlike many "SDKs", the Temporal SDKs are not just simple smart clients. Rather, they are entire complex state machines. In Temporal, users run the workers and all workflow and activity code happens on the worker, so advanced machinations are needed to ensure these run properly.
In Python we use PyO3 and PyO3 Asyncio with some custom Rust bridge code to make this work with the Rust core. This means code not only runs fast, but automatically incorporates state machine fixes and improvements as the core is improved.
Also see the post Why Rust powers Temporal's new Core SDK.
Replaying
One of the most powerful yet often overlooked features of Temporal is the ability to replay workflow code using historical runs. We can fetch workflow histories from the Temporal server and run them on our local workflow code using a replayer.
Replaying past workflows on newer workflow code can help catch incompatible/non-deterministic changes and other unexpected bugs before deployment.
Also, replaying a past workflow history on local workflow code allows us to debug that code. Simply disable the deadlock
detector on the replayer via debug_mode=True
, and we can put breakpoints in our workflow code and step through a
workflow exactly how it originally ran.
Test Environments
Temporal Python offers two test server implementations to make testing easy.
A full Temporal server running locally on SQLite can be started via
await temporalio.testing.WorkflowEnvironment.start_local()
. This downloads an executable into a temporary location if
not already present and starts it rather quickly. This has all features a standard Temporal server has since it is a
full, single-binary Temporal server. The UI can even be enabled.
We also provide a time-skipping server implementation.
await temporalio.testing.WorkflowEnvironment.start_time_skipping()
starts a server that can skip time to the next
workflow event. Using this, time can also be manually skipped. This is the perfect solution for testing workflows that
sleep for a long time or in cases where we want to check how a timeout may affect the system. Like the full Temporal
server, this one also downloads a binary lazily on first call then executes. It is actually written in Java as part of
our Java SDK and compiled natively via GraalVM.
Wrap-up
So that's a basic introduction to Temporal Python, how it tightly integrates with asyncio, and some of its other features. There are many more features not covered in this post.
Check out the repository and give it a try! Also, join us on the
#python-sdk
channel in Temporal Slack, the Community Forum,
on Twitter, etc. Our community has meetups,
workshops, conferences, and more. In particular, our
YouTube Channel has lots of good content from past presentations.