Batch tasks in workflows
Using Batch tasks in workflows requires a few additional steps than using Lambda tasks. Lambdas integrate more directly with Step Functions than Batch, which has additional layer of infrastructure in-between. As a result, users must keep a few more things in mind when using to Batch tasks.
pre-batch
and post-batch
Because Batch tasks cannot take the input payload natively like Lambda and must
redirect through S3, Cirrus provides a built-in Lambda task to assist. The
pre-batch
task takes an input payload, uploads it to S3, and returns an
output JSON payload with a url
key with the S3 payload URL as its value.
pre-batch
should always be run immediately proceeding a Batch task for this
purpose.
Similarly, Batch does not integrate returned payloads with step functions nearly
as well as Lambda, so Cirrus has a built-in post-batch
task to help with,
well, post-batch operations. Specifically, post-batch
can pull the Batch
payload from S3 and return that in the case of a successful Batch execution. In
the case of a Batch error, post-batch
will scrape the execution logs for an
exception or other error message, and raise it within the step function. This
helps “bubble” Batch task errors up to the step function. Errors can then be
handled within the step function semantics or used to fail the step function
execution with error context that can ultimately be pushed into the state
database, increasing error visibility and assisting with troubleshooting.
Therefore, like pre-batch
, a post-batch
step should always immediately
follow a Batch task step.
Put it all together with a parallel
block
Instead of simply being able to have a single step in a step function for a
Batch task, we end up with three steps. Because of the way they operate
together, we can think of the pre-batch
-> Batch task -> post-batch
triad as a single step from the perspective of error handling and retries. That
is, if we encounter an error anywhere in that set of three, we either want to
fail them all or retry them all together.
Enter the step function parallel
block. Step functions provide this control
primative to allow users to define one or more branches in a workflow that can
execute in parallel. Interestly for us, parallel
supports both Catch
and
Retry
policies for error handling, which provides us with the control we
need for Batch.
Error handling
It is essential that workflows properly handle errors when using Batch, as there are more things that can go wrong than when using Lambda. For example, when too many Step Functions are trying to create a new Batch Job, a AWSBatchException is thrown. When the EC2 instance that a Batch Job is running on is reclaimed, as happens frequently with Spot instances at scale, a retry should occur so the Batch Job is attempted again. This is why it is recommended to use a Parallel block with retry to wrap the Batch steps.
Additional considerations
Batch does not perform well when there are many jobs that run quickly. For example, if a task filters out a significant number of payloads quickly, the overhead of placing the jobs onto compute resources will dominate the runtime, and will result in a slow and inefficient pipeline. A few examples in Earth Search are:
Landsat SNS topic (public-c2-notify-v2) that initiates ingest includes many “Real-Time” (RT) scenes that are ignored. These result in a runtime of only a few seconds. If these were run with Batch, there would be a few minute overhead for job placement (incurring the cost of the EC2 instances for that time) for only a few seconds of actual use.
Even without the aforementioned RT scenes, Landsat ingest uses Lambda instead of Batch because the task is only performing a metadata-to-metadata conversion that takes tens of seconds per scene. The overhead for Batch is far greater than the actual runtime, and the task runtime is both low (much less than the Lambda maximum of 15 minutes) and consistent.
The Sentinel-2 Collection 1 Level-L2A collection only includes items with a “processing baseline” value of 05.00 or higher. All newly-acquired scenes have this processing baseline, but when back-processing the catalog, about half of the scenes have an older baseline and are immediately ignored. This meant that half of the Batch Jobs ran for seconds and half ran for 5-10 minutes. The batch jobs that ran for seconds caused a signficant increase in cost and decrease in throughput. A better solution would have been to have an initial Lambda that checked only if the processing baseline was appropriate, and only allowed the Batch job to run if it was.
Another consideration is with error handing and InvalidInputs. Tasks that raise InvalidInput exceptions are indicating that the payload can never be processed correctly. I trivial example of this would be a process payload with only an ID value and no other information. This is contrasted with a valid payload that fails because of something that can be corrected, such as a code bug or a
Minimal example
Let’s see an example of a workflow using a Parallel
block to group the set of Batch
operations together. Notice how the example uses parallel
with only a single
branch defined, but that fits the Batch use-case perfectly.
Example:
name: '#{AWS::StackName}-batch-example
definition:
Comment: "Example workflow using parallel to make a 'batch group'"
StartAt: batch-group
States:
batch-group:
Type: Parallel
Branches:
- StartAt: pre-batch
States:
pre-batch:
Type: Task
Resource: !GetAtt pre-batch.Arn
Next: batch-task
Retry:
- ErrorEquals: ["Lambda.TooManyRequestsException", "Lambda.Unknown"]
IntervalSeconds: 10
MaxDelaySeconds: 86400
BackoffRate: 2.0
MaxAttempts: 20
JitterStrategy: FULL
batch-task:
Type: Task
Resource: arn:aws:states:::batch:submitJob.sync
Parameters:
JobName: some-batch-job
JobQueue: "#{ExampleJobQueue}"
JobDefinition: "#{ExampleBatchJob}"
# Note that this passes the value of the `url` key in the step's
# input JSON to the job definition as the parameter `url`i.
Parameters:
url.$: "$.url"
Next: post-batch
Retry:
- ErrorEquals: ["Batch.AWSBatchException"]
IntervalSeconds: 600
MaxDelaySeconds: 86400
BackoffRate: 2.0
MaxAttempts: 20
JitterStrategy: FULL
Catch:
# Ensures we always go to post-batch to pull errors
- ErrorEquals: ["States.ALL"]
ResultPath: $.error
Next: post-batch
post-batch:
Type: Task
Resource: !GetAtt post-batch.Arn
# End of the branch, not the step function
End: True
Retry:
- ErrorEquals: ["Lambda.TooManyRequestsException", "Lambda.Unknown"]
IntervalSeconds: 10
MaxDelaySeconds: 86400
BackoffRate: 2.0
MaxAttempts: 20
JitterStrategy: FULL
Next: publish
# Parallel output is always an array of the outputs from each branch.
# We can use the OutputPath selector to get output index 0 as we only
# have a single branch, so we don't pass an array as input to the
# next task.
OutputPath: $[0]
Retry:
# This policy will retry multiple times after any errors
- ErrorEquals: ["States.ALL"]
MaxAttempts: 3
IntervalSeconds: 1200
MaxDelaySeconds: 86400
BackoffRate: 2.0
JitterStrategy: FULL
Catch:
# If the branch fails more than twice we fail the workflow
- ErrorEquals: ["States.ALL"]
ResultPath: $.error
Next: failure
publish:
Type: Task
Resource: !GetAtt publish.Arn
End: True
Retry:
- ErrorEquals: ["Lambda.TooManyRequestsException", "Lambda.Unknown"]
IntervalSeconds: 10
MaxDelaySeconds: 86400
BackoffRate: 2.0
MaxAttempts: 20
JitterStrategy: FULL
Catch:
- ErrorEquals: ["States.ALL"]
ResultPath: $.error
Next: failure
failure:
Type: Fail
Batch retries vs step function retries
Whenver possible, using the step function retry semantics over those provided by
Batch is preferred. While Batch retries can be used without having to manage the
additional complexity of the parallel
block, Batch retries regardless of
error type, while step function retries allow matching specific error types,
allowing users more granular control over when to retry or fail.
Additionally, retrying within the step function shows the retry as a separate step than the first. This makes it much more obvious to users investigating failures that a retry happened and what the initial error was. Batch retries are more or less hidden from the step functions.
For these reasons, the overhead of the parallel
block is worth the
investment.
Conditionally Using Batch or Lambda
Tasks can be configured to use either Batch or Lambda, and then the specific one to use can be specified in the payload and selected by the workflow.
The payload should include a field like batch with a boolean indicating if it’s Batch or not (meaning Lambda):
{
"process": {
...
"tasks": {"foo-to-stac": {"batch": true}},
...
}
Then in the workflow, this field can be used to drive a Choice block that selects either the Batch or Lambda path:
definition:
StartAt: batch-or-lambda
States:
batch-or-lambda:
Type: Choice
Choices:
- Variable: "$.process.tasks.foo-to-stac.batch"
IsPresent: false
Next: foo-to-stac-lambda
- Variable: "$.process.tasks.foo-to-stac.batch"
BooleanEquals: false
Next: foo-to-stac-lambda
- Variable: "$.process.tasks.foo-to-stac.batch"
BooleanEquals: true
Next: batch-group
In this case, foo-to-stac-lambda is a Task block that defines the Lambda path and batch-group is a Task or Parallel block that defines the Batch path.