Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
The Durable Task SDKs provide a lightweight client library for the Durable Task Scheduler. In this quickstart, you learn how to create orchestrations that use the fan-out/fan-in application pattern to perform parallel processing.
Important
Currently, the Durable Task SDKs aren't available for JavaScript and PowerShell.
Important
Currently, the Durable Task SDKs aren't available for JavaScript and PowerShell.
- Set up and run the Durable Task Scheduler emulator for local development.
- Run the worker and client projects.
- Review orchestration status and history via the Durable Task Scheduler dashboard.
Prerequisites
Before you begin:
- Make sure you have .NET 8 SDK or later.
- Install Docker for running the emulator.
- Clone the Durable Task Scheduler GitHub repository to use the quickstart sample.
- Make sure you have Python 3.9+ or later.
- Install Docker for running the emulator.
- Clone the Durable Task Scheduler GitHub repository to use the quickstart sample.
- Make sure you have Java 8 or 11.
- Install Docker for running the emulator.
- Clone the Durable Task Scheduler GitHub repository to use the quickstart sample.
Set up the Durable Task Scheduler emulator
The application code looks for a deployed scheduler and task hub resource. If none is found, the code falls back to the emulator. The emulator simulates a scheduler and task hub in a Docker container, making it ideal for the local development required in this quickstart.
From the
Azure-Samples/Durable-Task-Scheduler
root directory, navigate to the .NET SDK sample directory.cd samples/portable-sdks/dotnet/FanOutFanIn
Pull the Docker image for the emulator.
docker pull mcr.microsoft.com/dts/dts-emulator:latest
Run the emulator. The container may take a few seconds to be ready.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Since the example code automatically uses the default emulator settings, you don't need to set any environment variables. The default emulator settings for this quickstart are:
- Endpoint:
http://localhost:8080
- Task hub:
default
From the
Azure-Samples/Durable-Task-Scheduler
root directory, navigate to the Python SDK sample directory.cd samples/portable-sdks/python/fan-out-fan-in
Pull the Docker image for the emulator.
docker pull mcr.microsoft.com/dts/dts-emulator:latest
Run the emulator. The container may take a few seconds to be ready.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Since the example code automatically uses the default emulator settings, you don't need to set any environment variables. The default emulator settings for this quickstart are:
- Endpoint:
http://localhost:8080
- Task hub:
default
From the
Azure-Samples/Durable-Task-Scheduler
root directory, navigate to the Java SDK sample directory.cd samples/portable-sdks/java/fan-out-fan-in
Pull the Docker image for the emulator.
docker pull mcr.microsoft.com/dts/dts-emulator:latest
Run the emulator. The container may take a few seconds to be ready.
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
Since the example code automatically uses the default emulator settings, you don't need to set any environment variables. The default emulator settings for this quickstart are:
- Endpoint:
http://localhost:8080
- Task hub:
default
Run the quickstart
From the
FanOutFanIn
directory, navigate to theWorker
directory to build and run the worker.cd Worker dotnet build dotnet run
In a separate terminal, from the
FanOutFanIn
directory, navigate to theClient
directory to build and run the client.cd Client dotnet build dotnet run
Understanding the output
When you run this sample, you receive output from both the worker and client processes. Unpack what happened in the code when you ran the project.
Worker output
The worker output shows:
- Registration of the orchestrator and activities
- Log entries when each activity is called
- Parallel processing of multiple work items
- Final aggregation of results
Client output
The client output shows:
- The orchestration starting with a list of work items
- The unique orchestration instance ID
- The final aggregated results, showing each work item and its corresponding result
- Total count of processed items
Example output
Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
Activate a Python virtual environment.
Install the required packages.
pip install -r requirements.txt
Start the worker.
python worker.py
Expected output
You can see output indicating that the worker started and is waiting for work items.
Starting Fan Out/Fan In pattern worker... Using taskhub: default Using endpoint: http://localhost:8080 Starting gRPC worker that connects to http://localhost:8080 Successfully connected to http://localhost:8080. Waiting for work items...
In a new terminal, activate the virtual environment and run the client.
You can provide the number of work items as an argument. If no argument is provided, the example runs 10 items by default.
python client.py [number_of_items]
Understanding the output
When you run this sample, you receive output from both the worker and client processes. Unpack what happened in the code when you ran the project.
Worker output
The worker output shows:
- Registration of the orchestrator and activities.
- Status messages when processing each work item in parallel, showing that they're executing concurrently.
- Random delays for each work item (between 0.5 and 2 seconds) to simulate varying processing times.
- A final message showing the aggregation of results.
Client output
The client output shows:
- The orchestration starting with the specified number of work items.
- The unique orchestration instance ID.
- The final aggregated result, which includes:
- The total number of items processed
- The sum of all results (each item result is the square of its value)
- The average of all results
Example output
Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED
From the fan-out-fan-in
directory, build and run the application using Gradle.
./gradlew runFanOutFanInPattern
Tip
If you receive the error message zsh: permission denied: ./gradlew
, try running chmod +x gradlew
before running the application.
Understanding the output
When you run this sample, you receive output that shows:
- Registration of the orchestrator and activities.
- Status messages when processing each work item in parallel, showing that they're executing concurrently.
- Random delays for each work item (between 0.5 and 2 seconds) to simulate varying processing times.
- A final message showing the aggregation of results.
Unpack what happened in the code when you ran the project.
Example output
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60
Now that you ran the project locally, you can now learn how to deploy to Azure hosted in Azure Container Apps.
View orchestration status and history
You can view the orchestration status and history via the Durable Task Scheduler dashboard. By default, the dashboard runs on port 8082.
- Navigate to http://localhost:8082 in your web browser.
- Click the default task hub. The orchestration instance you created is in the list.
- Click the orchestration instance ID to view the execution details, which include:
- The parallel execution of multiple activity tasks
- The fan-in aggregation step
- The input and output at each step
- The time taken for each step
Understanding the code structure
The worker project
To demonstrate the fan-out/fan-in pattern, the worker project orchestration creates parallel activity tasks and waits for all to complete. The orchestrator:
- Takes a list of work items as input.
- Fans out by creating a separate task for each work item using
ProcessWorkItemActivity
. - Executes all tasks in parallel.
- Waits for all tasks to complete using
Task.WhenAll
. - Fans in by aggregating all individual results using
AggregateResultsActivity
. - Returns the final aggregated result to the client.
The worker project contains:
- ParallelProcessingOrchestration.cs: Defines the orchestrator and activity functions in a single file.
- Program.cs: Sets up the worker host with proper connection string handling.
ParallelProcessingOrchestration.cs
Using fan-out/fan-in, the orchestration creates parallel activity tasks and waits for all to complete.
public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
Each activity is implemented as a separate class decorated with the [DurableTask]
attribute.
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
// Implementation processes a single work item
}
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
// Implementation aggregates individual results
}
Program.cs
The worker uses Microsoft.Extensions.Hosting
for proper lifecycle management.
using Microsoft.Extensions.Hosting;
//..
builder.Services.AddDurableTaskWorker()
.AddTasks(registry =>
{
registry.AddOrchestrator<ParallelProcessingOrchestration>();
registry.AddActivity<ProcessWorkItemActivity>();
registry.AddActivity<AggregateResultsActivity>();
})
.UseDurableTaskScheduler(connectionString);
The client project
The client project:
- Uses the same connection string logic as the worker.
- Creates a list of work items to be processed in parallel.
- Schedules an orchestration instance with the list as input.
- Waits for the orchestration to complete and displays the aggregated results.
- Uses
WaitForInstanceCompletionAsync
for efficient polling.
List<string> workItems = new List<string>
{
"Task1",
"Task2",
"Task3",
"LongerTask4",
"VeryLongTask5"
};
// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"ParallelProcessingOrchestration",
workItems);
// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
instanceId,
getInputsAndOutputs: true,
cts.Token);
worker.py
To demonstrate the fan-out/fan-in pattern, the worker project orchestration creates parallel activity tasks and waits for all to complete. The orchestrator:
- Receives a list of work items as input.
- It "fans out" by creating parallel tasks for each work item (calling
process_work_item
for each one). - It waits for all tasks to complete using
task.when_all
. - It then "fans in" by aggregating the results with the
aggregate_results
activity. - The final aggregated result is returned to the client.
Using fan-out/fan-in, the orchestration creates parallel activity tasks and waits for all to complete.
# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")
# Fan out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity("process_work_item", input=item))
# Wait for all tasks to complete
logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
results = yield task.when_all(parallel_tasks)
# Fan in: Aggregate all the results
logger.info("All parallel tasks completed, aggregating results")
final_result = yield ctx.call_activity("aggregate_results", input=results)
return final_result
client.py
The client project:
- Uses the same connection string logic as the worker.
- Creates a list of work items to be processed in parallel.
- Schedules an orchestration instance with the list as input.
- Waits for the orchestration to complete and displays the aggregated results.
- Uses
wait_for_orchestration_completion
for efficient polling.
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))
logger.info(f"Starting new fan out/fan in orchestration with {count} work items")
# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
"fan_out_fan_in_orchestrator",
input=work_items
)
logger.info(f"Started orchestration with ID = {instance_id}")
# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=60
)
To demonstrate the fan-out/fan-in pattern, the FanOutFanInPattern
project orchestration creates parallel activity tasks and waits for all to complete. The orchestrator:
- Takes a list of work items as input.
- Fans out by creating a separate task for each work item using ``.
- Executes all tasks in parallel.
- Waits for all tasks to complete using ``.
- Fans in by aggregating all individual results using ``.
- Returns the final aggregated result to the client.
The project contains:
DurableTaskSchedulerWorkerExtensions
worker: Defines the orchestrator and activity functions.DurableTaskSchedulerClientExtension
client: Sets up the worker host with proper connection string handling.
Worker
Using fan-out/fan-in, the orchestration creates parallel activity tasks and waits for all to complete.
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
List<Integer> allWordCountResults = ctx.allOf(tasks).await();
int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalWordCount);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
.build();
// Start the worker
worker.start();
Client
The client project:
- Uses the same connection string logic as the worker.
- Creates a list of work items to be processed in parallel.
- Schedules an orchestration instance with the list as input.
- Waits for the orchestration to complete and displays the aggregated results.
- Uses
waitForInstanceCompletion
for efficient polling.
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();
// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
"The greatest glory in living lies not in never falling, but in rising every time we fall.",
"Always remember that you are absolutely unique. Just like everyone else.");
// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));
Next steps
Now that you've run the sample locally using the Durable Task Scheduler emulator, try creating a scheduler and task hub resource and deploying to Azure Container Apps.