Core application - Python SDK
This page shows how to do the following:
- Develop a basic Workflow
- Define Workflow parameters
- Define Workflow return parameters
- Customize your Workflow Type
- Develop Workflow logic
- Develop a basic Activity
- Develop Activity Parameters
- Define Activity return values
- Customize your Activity Type
- Start an Activity Execution
- Set the required Activity Timeouts
- Get the results of an Activity Execution
- Run a Worker Process
- Register types
Develop a basic Workflow
How to develop a basic Workflow using the Temporal Python SDK.
Workflows are the fundamental unit of a Temporal Application, and it all starts with the development of a Workflow Definition.
In the Temporal Python SDK programming model, Workflows are defined as classes.
Specify the @workflow.defn
decorator on the Workflow class to identify a Workflow.
Use the @workflow.run
to mark the entry point method to be invoked. This must be set on one asynchronous method defined on the same class as @workflow.defn
. Run methods have positional parameters.
View the source code
in the context of the rest of the application code.
from temporalio import workflow
# ...
# ...
@workflow.defn(name="YourWorkflow")
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
YourParams("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Define Workflow parameters
How to define Workflow parameters using the Temporal Python SDK.
Temporal Workflows may have any number of custom parameters. However, we strongly recommend that objects are used as parameters, so that the object's individual fields may be altered without breaking the signature of the Workflow. All Workflow Definition parameters must be serializable.
Workflow parameters are the method parameters of the singular method decorated with @workflow.run
.
These can be any data type Temporal can convert, including dataclasses
when properly type-annotated.
Technically this can be multiple parameters, but Temporal strongly encourages a single dataclass
parameter containing all input fields.
View the source code
in the context of the rest of the application code.
from dataclasses import dataclass
# ...
# ...
@dataclass
class YourParams:
greeting: str
name: str
Define Workflow return parameters
How to define Workflow return parameters using the Temporal Python SDK.
Workflow return values must also be serializable. Returning results, returning errors, or throwing exceptions is fairly idiomatic in each language that is supported. However, Temporal APIs that must be used to get the result of a Workflow Execution will only ever receive one of either the result or the error.
To return a value of the Workflow, use return
to return an object.
To return the results of a Workflow Execution, use either start_workflow()
or execute_workflow()
asynchronous methods.
View the source code
in the context of the rest of the application code.
from temporalio import workflow
# ...
# ...
@workflow.defn(name="YourWorkflow")
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
YourParams("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Customize your Workflow Type
How to customize your Workflow Type using the Temporal Python SDK.
Workflows have a Type that are referred to as the Workflow name.
The following examples demonstrate how to set a custom name for your Workflow Type.
You can customize the Workflow name with a custom name in the decorator argument. For example, @workflow.defn(name="your-workflow-name")
. If the name parameter is not specified, the Workflow name defaults to the unqualified class name.
View the source code
in the context of the rest of the application code.
from temporalio import workflow
# ...
# ...
@workflow.defn(name="YourWorkflow")
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
YourParams("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Develop Workflow logic
How to develop Workflow logic using the Temporal Python SDK.
Workflow logic is constrained by deterministic execution requirements. Therefore, each language is limited to the use of certain idiomatic techniques. However, each Temporal SDK provides a set of APIs that can be used inside your Workflow to interact with external (to the Workflow) application code.
Workflow code must be deterministic. This means:
- no threading
- no randomness
- no external calls to processes
- no network I/O
- no global state mutation
- no system date or time
All API safe for Workflows used in the temporalio.workflow
must run in the implicit asyncio
event loop and be deterministic.
Develop a basic Activity
How to develop a basic Activity using the Temporal Python SDK.
One of the primary things that Workflows do is orchestrate the execution of Activities. An Activity is a normal function or method execution that's intended to execute a single, well-defined action (either short or long-running), such as querying a database, calling a third-party API, or transcoding a media file. An Activity can interact with world outside the Temporal Platform or use a Temporal Client to interact with a Temporal Service. For the Workflow to be able to execute the Activity, we must define the Activity Definition.
You can develop an Activity Definition by using the @activity.defn
decorator.
Register the function as an Activity with a custom name through a decorator argument, for example @activity.defn(name="your_activity")
.
The Temporal Python SDK supports multiple ways of implementing an Activity:
- Asynchronously using
asyncio
- Synchronously multithreaded using
concurrent.futures.ThreadPoolExecutor
- Synchronously multiprocess using
concurrent.futures.ProcessPoolExecutor
andmultiprocessing.managers.SyncManager
Blocking the async event loop in Python would turn your asynchronous program into a synchronous program that executes serially, defeating the entire purpose of using asyncio
.
This can also lead to potential deadlock, and unpredictable behavior that causes tasks to be unable to execute.
Debugging these issues can be difficult and time consuming, as locating the source of the blocking call might not always be immediately obvious.
Due to this, consider not make blocking calls from within an asynchronous Activity, or use an async safe library to perform these actions. If you must use a blocking library, consider using a synchronous Activity instead.
View the source code
in the context of the rest of the application code.
from temporalio import activity
# ...
# ...
@activity.defn(name="your_activity")
async def your_activity(input: YourParams) -> str:
return f"{input.greeting}, {input.name}!"
Develop Activity Parameters
How to develop Activity Parameters using the Temporal Python SDK.
There is no explicit limit to the total number of parameters that an Activity Definition may support. However, there is a limit to the total size of the data that ends up encoded into a gRPC message Payload.
A single argument is limited to a maximum size of 2 MB. And the total size of a gRPC message, which includes all the arguments, is limited to a maximum of 4 MB.
Also, keep in mind that all Payload data is recorded in the Workflow Execution Event History and large Event Histories can affect Worker performance. This is because the entire Event History could be transferred to a Worker Process with a Workflow Task.
Some SDKs require that you pass context objects, others do not. When it comes to your application data—that is, data that is serialized and encoded into a Payload—we recommend that you use a single object as an argument that wraps the application data passed to Activities. This is so that you can change what data is passed to the Activity without breaking a function or method signature.
Activity parameters are the function parameters of the function decorated with @activity.defn
.
These can be any data type Temporal can convert, including dataclasses when properly type-annotated.
Technically this can be multiple parameters, but Temporal strongly encourages a single dataclass parameter containing all input fields.
View the source code
in the context of the rest of the application code.
from temporalio import activity
from your_dataobject_dacx import YourParams
# ...
# ...
@activity.defn(name="your_activity")
async def your_activity(input: YourParams) -> str:
return f"{input.greeting}, {input.name}!"
Define Activity return values
How to define Activity return values using the Temporal Python SDK.
All data returned from an Activity must be serializable.
There is no explicit limit to the amount of data that can be returned by an Activity, but keep in mind that all return values are recorded in a Workflow Execution Event History.
An Activity Execution can return inputs and other Activity values.
The following example defines an Activity that takes a string as input and returns a string.
View the source code
in the context of the rest of the application code.
# ...
@activity.defn(name="your_activity")
async def your_activity(input: YourParams) -> str:
return f"{input.greeting}, {input.name}!"
Customize your Activity Type
How to customize your Activity Type
Activities have a Type that are referred to as the Activity name. The following examples demonstrate how to set a custom name for your Activity Type.
You can customize the Activity name with a custom name in the decorator argument. For example, @activity.defn(name="your-activity")
.
If the name parameter is not specified, the Activity name defaults to the function name.
View the source code
in the context of the rest of the application code.
# ...
@activity.defn(name="your_activity")
async def your_activity(input: YourParams) -> str:
return f"{input.greeting}, {input.name}!"
Start an Activity Execution
How to start an Activity Execution using the Temporal Python SDK.
Calls to spawn Activity Executions are written within a Workflow Definition. The call to spawn an Activity Execution generates the ScheduleActivityTask Command. This results in the set of three Activity Task related Events (ActivityTaskScheduled, ActivityTaskStarted, and ActivityTask[Closed])in your Workflow Execution Event History.
A single instance of the Activities implementation is shared across multiple simultaneous Activity invocations. Activity implementation code should be idempotent.
The values passed to Activities through invocation parameters or returned through a result value are recorded in the Execution history. The entire Execution history is transferred from the Temporal service to Workflow Workers when a Workflow state needs to recover. A large Execution history can thus adversely impact the performance of your Workflow.
Therefore, be mindful of the amount of data you transfer through Activity invocation parameters or Return Values. Otherwise, no additional limitations exist on Activity implementations.
To spawn an Activity Execution, use the execute_activity()
operation from within your Workflow Definition.
execute_activity()
is a shortcut for start_activity()
that waits on its result.
To get just the handle to wait and cancel separately, use start_activity()
.
In most cases, use execute_activity()
unless advanced task capabilities are needed.
A single argument to the Activity is positional. Multiple arguments are not supported in the type-safe form of start_activity()
or execute_activity()
and must be supplied by the args
keyword argument.
View the source code
in the context of the rest of the application code.
from temporalio import workflow
# ...
# ...
@workflow.defn(name="YourWorkflow")
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
YourParams("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Set the required Activity Timeouts
How to set the required Activity Timeouts using the Temporal Python SDK.
Activity Execution semantics rely on several parameters. The only required value that needs to be set is either a Schedule-To-Close Timeout or a Start-To-Close Timeout. These values are set in the Activity Options.
Activity options are set as keyword arguments after the Activity arguments.
Available timeouts are:
- schedule_to_close_timeout
- schedule_to_start_timeout
- start_to_close_timeout
View the source code
in the context of the rest of the application code.
# ...
activity_timeout_result = await workflow.execute_activity(
your_activity,
YourParams(greeting, "Activity Timeout option"),
# Activity Execution Timeout
start_to_close_timeout=timedelta(seconds=10),
# schedule_to_start_timeout=timedelta(seconds=10),
# schedule_to_close_timeout=timedelta(seconds=10),
)
Get the results of an Activity Execution
How to get the results of an Activity Execution using the Temporal Python SDK.
The call to spawn an Activity Execution generates the ScheduleActivityTask Command and provides the Workflow with an Awaitable. Workflow Executions can either block progress until the result is available through the Awaitable or continue progressing, making use of the result when it becomes available.
Use start_activity()
to start an Activity and return its handle, ActivityHandle
. Use execute_activity()
to return the results.
You must provide either schedule_to_close_timeout
or start_to_close_timeout
.
execute_activity()
is a shortcut for await start_activity()
. An asynchronous execute_activity()
helper is provided which takes the same arguments as start_activity()
and await
s on the result. execute_activity()
should be used in most cases unless advanced task capabilities are needed.
View the source code
in the context of the rest of the application code.
from temporalio import workflow
# ...
# ...
@workflow.defn(name="YourWorkflow")
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
YourParams("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Run a Worker Processes
How to run a Worker Process using the Temporal Python SDK.
The Worker Process is where Workflow Functions and Activity Functions are executed.
- Each Worker Entity in the Worker Process must register the exact Workflow Types and Activity Types it may execute.
- Each Worker Entity must also associate itself with exactly one Task Queue.
- Each Worker Entity polling the same Task Queue must be registered with the same Workflow Types and Activity Types.
A Worker Entity is the component within a Worker Process that listens to a specific Task Queue.
Although multiple Worker Entities can be in a single Worker Process, a single Worker Entity Worker Process may be perfectly sufficient. For more information, see the Worker tuning guide.
A Worker Entity contains a Workflow Worker and/or an Activity Worker, which makes progress on Workflow Executions and Activity Executions, respectively.
To develop a Worker, use the Worker()
constructor and add your Client, Task Queue, Workflows, and Activities as arguments.
The following code example creates a Worker that polls for tasks from the Task Queue and executes the Workflow.
When a Worker is created, it accepts a list of Workflows in the workflows parameter, a list of Activities in the activities parameter, or both.
View the source code
in the context of the rest of the application code.
from temporalio.client import Client
from temporalio.worker import Worker
# ...
# ...
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="your-task-queue",
workflows=[YourWorkflow],
activities=[your_activity],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Register types
How to register types using the Temporal Python SDK.
All Workers listening to the same Task Queue name must be registered to handle the exact same Workflows Types and Activity Types.
If a Worker polls a Task for a Workflow Type or Activity Type it does not know about, it fails that Task. However, the failure of the Task does not cause the associated Workflow Execution to fail.
When a Worker
is created, it accepts a list of Workflows in the workflows
parameter, a list of Activities in the activities
parameter, or both.
View the source code
in the context of the rest of the application code.
# ...
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="your-task-queue",
workflows=[YourWorkflow],
activities=[your_activity],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())