The Problem
Imagine your code waits for some external system to respond. In the non-serverless world, you’d most likely just add a Quartz/cron-like job that periodically checks if the external processing finished. That’s easy as you have the server running 24/7.
But how you’d do this in AWS serverless world? There are a couple of solutions like: “adding CloudWatch trigger,” “using DynamoDB TTL and implementing events on the deletion of an entry.”
However, in this post, I’ll focus on using an AWS SQS (Simple Queue Service).
Solution
The whole idea is built around the SQS feature called Visibility Timeout. This is the time after which an unprocessed message will be visible back in the queue so another listener can see it. Take a look at the following image from Amazon:
How we can use it for job scheduling?
The idea is that:
- we will send an SQS message whenever a new job will be scheduled,
- after 5 seconds (
DelaySeconds: 5
) the message will be visible in the queue, - another lambda will be a listener on this queue and will try to process it.
Now, if the external system:
“already got the response we’re waiting for - we’ll do our job and acknowledge the message so it vanishes from the SQS,”
“is not ready - we’ll throw an exception meaning that we want to retry after 5 seconds ( VisibilityTimeout: 5
). "
This means the message will not be acknowledged (it is still in the SQS) and will become visible to another listener after those 5 seconds. We retry it 10 times (RedrivePolicy
and its maxReceiveCount: 10
) and with still no reply from the external system - we put this message in DLQ for further analysis.
So, on to the code!
Preparation
I’m assuming you have Serverless Framework installed along with AWS CLI. If not, please refer to these guides (Serverless Framework and AWS CLI) on how to prepare your environment.
The AWS Lambda code is written using NodeJS 10.x runtime.
Code
I have put a sample code at my Github repository here.
It consists of:
aws.yaml
It’s a CloudFormation template that you need to invoke using AWS Console or AWS CLI to prepare the SQS queue and accompanying DLQ (Dead Letter Queue.)
(We could be doing this directly in Serverless Framework but I want to get some experience directly with CloudFormation hence such approach.)
AWSTemplateFormatVersion: 2010-09-09
Description: >
Sample SQS queue with DLQ to be used by Lambda and act like a job that checks
if some external processing has been finished
Parameters:
QueueNameParam:
Type: String
Default: tasks-queue
Description: Enter the name of the queue. DLQ will be created suffixed "-dlq"
Resources:
MyTasksQueue:
Type: 'AWS::SQS::Queue'
Properties:
DelaySeconds: 5
QueueName: !Ref QueueNameParam
VisibilityTimeout: 5
RedrivePolicy:
deadLetterTargetArn: !GetAtt MyTasksDLQ.Arn
maxReceiveCount : 10
Tags:
"Key: created_by"
Value: serverless-lambda-sqs-trigger
MyTasksDLQ:
Type: 'AWS::SQS::Queue'
Properties:
QueueName: !Join ['-', [!Ref QueueNameParam, 'dlq']]
Tags:
"Key: created_by"
Value: serverless-lambda-sqs-trigger
Outputs:
QueueArn:
Description: ARN of the SQS Queue to be used in Serverless Framework YAML config file
Value: !GetAtt MyTasksQueue.Arn
QueueUrl:
Description: URL of the SQS Queue to be used in AWS SQS API
Value: !Ref MyTasksQueue
After you’ll invoke the CloudFormation template, a stack will be created and in Outputs
section of it you can see two values:
- SQS ARN (Amazon Resource Name) and
- SQS URL:
Write it down as you’ll need it when configuring Serverless Framework.
You can adjust the name of the queue to be created using the QueueNameParam
. It defaults to tasks-queue
.
$ aws cloudformation create-stack --stack-name "MySqsLambdaStack" --template-body file://aws.yaml --parameters ParameterKey=QueueNameParam,ParameterValue=my-custom-queue-name
You can also check the results of the stack creation without entering the AWS console:
$ aws cloudformation describe-stacks --stack-name "MySqsLambdaStack"
/serverless
It’s a Serverless Framework project containing 2 AWS Lambda functions:
“createTask
triggered by HTTP POST /tasks
entry point that will schedule a task,”
“executeTask
triggered by SQS that will act as a job executor – this is a trivial implementation that will randomly simulate that the external system has finished processing or not.”
service: serverless-lambda-sqs-trigger
provider:
name: aws
runtime: nodejs10.x
region: eu-central-1
memorySize: 128
iamRoleStatements:
"Effect: "Allow""
Action:
""sqs:SendMessage""
""sqs:GetQueueUrl""
Resource: ${self:custom.sqsTasksArn}
environment:
SQS_TASKS_URL: "PUT YOUR SQS TASK URL HERE"
functions:
createTask:
handler: handler.createTask
events:
"http: POST /tasks"
executeTask:
timeout: 5
handler: handler.executeTask
events:
"sqs:"
arn: ${self:custom.sqsTasksArn}
batchSize: 1
custom:
sqsTasksArn: "PUT YOUR SQS TASK ARN HERE"
And here is the implementation of lambdas itself:
'use strict';
const AWS = require('aws-sdk');
const SQS = new AWS.SQS({apiVersion: '2012-11-05'});
const queueUrl = process.env.SQS_TASKS_URL;
module.exports.createTask = async event => {
return new Promise((resolve) => {
console.info("Scheduling a task for later invocation.");
const params = {
MessageBody: JSON.stringify("My Scheduled Task"),
QueueUrl: queueUrl
};
SQS.sendMessage(params, (err, data) => {
if (err) {
throw "Task not scheduled";
}
resolve({
statusCode: 202,
body: "Task scheduled"
});
});
});
};
module.exports.executeTask = async event => {
console.info('Received task to be executed.');
// We can do "[0]" thanks to "batchSize: 1"; otherwise, be prepared for a list of events!
const result = tryToProcess(event.Records[0].body);
console.info("Correctly processed the message with result: %s", result);
};
// This simulates some business logic process - here we're just getting some random responses
const tryToProcess = (requestData) => {
if (Math.round(Math.random()) === 0) {
throw "Need to wait for some third-party system, try again later";
}
return "Success";
};
You’d need to put your SQS queue data you wrote down after CloudFormation execution in sqsTasksArn
and SQS_TASKS_URL
.
Deploy
Enter the serverless
directory and deploy the serverless application:
$ sls deploy
After the success, you’ll see the AWS API Gateway link you can use to simulate the job being scheduled.
Invoke and Observe the Results
Now simulate few tasks being scheduled by invoking the POST /tasks
endpoint:
$ curl -X POST https://{your-api-gw-url}/tasks
All requests should finish with HTTP 202 (Accepted)
. The Job has been scheduled in the form of an SQS message.
You can now observe the logs using CloudWatch or directly using Serverless Framework:
$ sls logs -tf executeTask
Some tasks, randomly, should be marked as: “finished - simulating external service having our results and hence the SQS message being acknowledged,” “in-progress - simulating external service still needing some time and hence not acknowledging SQS message and waiting for it to be processed again by another call after the Visibility Time will pass.”
See exemplary output below:
Summary
You have just created a very simplistic job execution where you pay only for the actual processing time (i.e. the code is invoked only when there is a process we’re waiting for).
If there is no process happening in the external system - no processing is done with no costs on your side.
In the end, there are a couple of things that you might consider adjusting: “job is always retried after the same, constant, Visibility Timeout. It might be a better approach to use exponential retry times. This can be achieved by setting a visibility timeout on the message itself. Take a look at docs.”
“No code will support the DLQ messages. There could be a separate AWS Lambda triggered when the message in DLQ arrives and e.g. mark the task as failed or set some higher visibility timeout to retry last time.”
“When the external system is not ready we’re throwing an exception which is noted in the logs as ugly Error
. It would be better to just mark the message as not delivered and not acknowledged without throwing an error. "
“Using SQS as a trigger for Lambda can cause some concurrency limits hits – for more information take a look at a great post by Zac Charles (take also a look at part II and comments from Jeremy Daly!)”