Python SDK: Diving into Workers and Workflows
Matt Bernier
In our previous post, we announced that we have our Python SDK in GA and shared a little bit of history around why we built what we did and some prerequisites you needed to get going. Now, let’s dive in a bit deeper to get you past up and running. For the purposes of this post, we are going to break down the single file shown in our hello_activity.py example and split it into multiple files so we more closely match what your application might look like!
The Workflow Code: workflow.py
We want to create the workflow first, because the worker class is going to call this. If we don’t do this first, then your code won’t run, and that isn’t any fun.
The top of our file, like any good python file, has the imports that we need to make this thing go. We are also defining a python dataclass which will be the object we use to pass data to the workflow. By passing a python dataclass instead of multiple parameters to the workflow, we can add or remove fields from the dataclass instead of modifying how the workflow is called. This allows some flexibility to make future changes without needing to version the workflow.
In this case, ComposeGreetingInput
allows us to pass a greeting
string and a name
string into the workflow.
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
# Temporal strongly encourages using a single dataclass so
# that you can add fields in a backwards-compatible way.
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
Just below ComposeGreetingInput
we are going to add two new pieces of code, first is an activity.
The Temporal Python SDK comes with decorators that do some work for you. In the case of an activity, it registers this method as a valid activity that can run by the Worker (see below). An activity takes in some data, processes it, and can return values. (in effect, it’s just like any other method). The one thing that we should call out, is that if you’re going to do anything that is non-deterministic, like generating a UUID, you want to do that in your activities or the methods that your activities call out to.
compose_greeting
takes in the ComposeGreetingInput
dataclass object that we defined above, logs some information to the terminal, and then returns concatenated strings from the object passed in. We could just as easily have pushed data to a database, done some computation, or even made a call to a third-party API in this method.
# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"
Now that we have an activity, we are going to add in our Workflow class. This code is the backbone for your Temporal script. Everything that Temporal will run for you in this example is inside a Workflow class.
We decorate the class with workflow.defn
to register this properly, so Temporal Worker (see below) knows that this is a valid Workflow Class.
The first method in this example class is run
, but there are a couple things to call out:
The run
method is decorated with @workflow.run
, which tells the Worker that this is the method to run when we start a workflow. It can be used to set up the workflow variables, call out to one or more activities, and so many more things. To learn more, check out:
- Dev guide:
t.mp/py
- API reference:
python.temporal.io
The method itself is an async method, because Temporal’s python library uses asyncio under the hood and the methods called need to be able to be run asynchronously.
The workflow run
method expects a string name
to be passed when it is called by the Worker
Within the method, we are calling Temporals workflow.execute_activity
method, which takes:
- An activity method reference, in this case
compose_greeting
- Arguments, which in this case we are using a feature of dataclasses to pass in the greeting “Hello” and the name
name
to theComposeGreetingInput
dataclass we created earlier. - A timeout. In this case, we’re providing
start_to_close_timeout
, which tells Temporal Server to time out this activity 10 seconds from whenever the activity starts.
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
When you’re all done with this file, it will look like this:
workflow.py
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Now that we have the Workflow code, we’ll tie this all together with the Worker, which is the code that Temporal uses to execute your Workflow code asynchronously as tasks show up on the queue. In this case, our example is very simple so the script will add the task to the queue right after creating the worker. In our samples repo, you will find many more in depth examples of how to build applications where different scripts outside of these two can make the workflow do other tasks asynchronously. You can even ask the workflow about itself or for the data that it’s holding for you, using a query.
Execution Code: worker.py
Let’s set up the top of our script, so that we have all the right imports and environment. This will be the skeleton of our application:
worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting
interrupt_event = asyncio.Event()
async def main():
print("Hello, world!")
if __name__ == "__main__":
asyncio.run(main())
This example will run, in the state above, but all it will do is print “Hello, world!” to your terminal at this point.
In order to take advantage of Temporal, we need our code to talk to the Temporal Server. We connect to the server via a Client. Our code will send activities, signals, and queries to the Temporal task queue inside the Temporal Server via this connection. When these tasks on the queue are executed, they are committed to the workflow history and this allows Temporal to know exactly what code has been run, what code is left to run, and what the state of your application is at any given moment in time!
Create a Client
Replace the main()
function above with the client connection code, using the same default localhost url path and port for Temporal Server:
async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
Note: we are not using http://localhost:7233
, but instead localhost:7233
!
Next, we want to add what we call a worker, which is the piece of code that actually calls our workflow code for the activities on the queue.
Creating the worker
On the line just below the client, inside of main()
, add the following:
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
print("Still saying ‘hello’ to you, world!")
Running this code tells Temporal Server that there is a worker ready to process tasks by passing the following information:
client
- Allows the worker to reach out and say “I’m here, Temporal Server, give me work!”task_queue
- Tells Temporal Server, “I am only set up to process tasks from this queue”workflows
- An list of python class references, calledWorkflows
(see below), written specifically to handle runningactivities
that process the tasks on the requestedtask_queue
activities
- An array of the python function references that can process tasks on the task queue
At this point, your code will look like this. All this code is doing is connecting to the Temporal Server and allowing the Worker code to run your print
statement. We are not realizing the full potential of Temporal quite yet. This code will print Still saying ‘hello’ to you, world!
to your terminal. After you run this code, if you go to the Web UI url (default is 127.0.0.1:8233
), you won’t actually see anything in your workflow list, yet.
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting
interrupt_event = asyncio.Event()
async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
print("Still saying ‘hello’ to you, world!")
if __name__ == "__main__":
asyncio.run(main())
To run our GreetingWorkflow
, we ask the Client to execute the workflow. Then, when we run our script, Temporal Server will be keeping track of what the code has done so far, and you will be able to see all the work that Temporal did for you in the Web UI.
In the code above, replace the print statement with the following:
# While the worker is running, use the client to run the workflow and
# print out its result.
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="hello-activity-workflow-id",
task_queue="hello-activity-task-queue",
)
print(f"Result: {result}")
This code will allow the worker to tell the client to:
- Asynchronously call execute_workflow (i.e. run the workflow
run
method) - Pass the input “World” to
GreetingWorkflow.run
- Give the workflow the identifier
hello-activity-workflow-id
- Put the tasks that the Workflow generates onto the
hello-activity-task-queue
(which you may recognize from such far flung places as a couple lines above when we created the Worker)
Running it all
Run this in your terminal: python worker.py
Once the Worker executes the code, it sets the string returned from compose_greeting
to result
and then prints it out to the terminal. You should end up seeing Hello, World!
in your terminal.
In the Web UI, you will see something like this, with one workflow row for each time your code executed:
Clicking the workflow ID for one of the rows, you will see everything that happened with your code and Temporal server when your worker executed the workflow code:
Real talk: Temporal is built for applications that do more than print text to your terminal, but we wanted to give an extremely simple way to show you the different pieces. In a follow up blog post, we will show you an actual application that uses Temporal to keep state within the workflow.
For funsies, if you want to do something also simple but that gives you a little more functionality, you can take in an input from the console with this version of worker.py
:
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting
interrupt_event = asyncio.Event()
async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
print("What's your name?")
name = input()
# While the worker is running, use the client to run the workflow and
# print out its result.
result = await client.execute_workflow(
GreetingWorkflow.run,
name,
id="hello-activity-workflow-id",
task_queue="hello-activity-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
In my case, the output looked like this:
This code also results in a slightly different WorkflowExecutionStarted
event in the workflow’s event history in the Web UI. We can see the input “Matt” was passed to the activity rather than “World”.
Click “Workflow Execution Started” on your workflow to see the difference!
Success!
There you have it, workflows and workers broken down into some areas for you to work on within your application. Hopefully this helps you get going with your Python application and a better understanding of how you can get going with Temporal.
Want to take it a step further? Check out our developer advocate’s post about how they built a poker application using some of the basics of Temporal. For a full tutorial, check out Build a Temporal Application from scratch in Python.