Stream processors do most of the heavy lifting in cloud-native services. Autonomous cloud-native services perform all inter-service communication asynchronously via event streaming to decouple upstream services from downstream services. Upstream services publish events to a stream, with no knowledge of the specific downstream services that will eventually consume the events. Downstream services deploy stream-processing functions to consume events of interest. Stream processors will be covered extensively throughout this cookbook. This recipe demonstrates how to create a function that listens for events from an AWS Kinesis stream and provides a quick introduction to using the functional reactive programming paradigm for implementing stream processing.
Creating a stream processor
Getting ready
Before starting this recipe, you will need an AWS Kinesis Stream, such as the one created in the Creating an event stream recipe.
How to do it...
- Create the project from the following template:
$ sls create --template-url https://github.com/danteinc/js-cloud-native-cookbook/tree/master/ch1/create-stream-processor --path cncb-create-stream-processor
- Navigate to the cncb-create-stream-processor directory with cd cncb-create-stream-processor.
- Review the file named serverless.yml with the following content:
service: cncb-create-stream-processor
provider:
name: aws
runtime: nodejs8.10
functions:
listener:
handler: handler.listener
events:
- stream:
type: kinesis
arn: ${cf:cncb-event-stream-${opt:stage}.streamArn}
batchSize: 100
startingPosition: TRIM_HORIZON
- Review the file named handler.js with the following content:
const _ = require('highland');
module.exports.listener = (event, context, cb) => {
_(event.Records)
.map(recordToEvent)
.tap(printEvent)
.filter(forThingCreated)
.collect()
.tap(printCount)
.toCallback(cb);
};
const recordToEvent = r => JSON.parse(Buffer.from(r.kinesis.data, 'base64'));
const forThingCreated = e => e.type === 'thing-created';
const printEvent = e => console.log('event: %j', e);
const printCount = events => console.log('count: %d', events.length);
- Install the dependencies with npm install.
- Run the tests with npm test -- -s $MY_STAGE.
- Review the contents generated in the .serverless directory.
- Deploy the stack:
$ npm run dp:lcl -- -s $MY_STAGE
> cncb-create-stream-processor@1.0.0 dp:lcl <path-to-your-workspace>/cncb-create-stream-processor
> sls deploy -r us-east-1 "-s" "john"
Serverless: Packaging service...
...
Serverless: Stack update finished...
Service Information
service: cncb-create-stream-processor
stage: john
region: us-east-1
stack: cncb-create-stream-processor-john
...
functions:
listener: cncb-create-stream-processor-john-listener
- Review the stack and function in the AWS Console.
- Publish an event from a separate Terminal with the following commands:
$ cd <path-to-your-workspace>/cncb-event-stream
$ sls invoke -r us-east-1 -f publish -s $MY_STAGE -d '{"type":"thing-created"}'
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49582906351415672136958521360120605392824155736450793474"
}
- Take a look at the logs from the original Terminal:
$ sls logs -f listener -r us-east-1 -s $MY_STAGE
START ...
2018-03-25 00:16:32 ... event:
{
"type":"thing-created",
"id":"81fd8920-2fdb-11e8-b749-0d2c43ec73d0",
"partitionKey":"6f4f9a38-61f7-41c9-a3ad-b8c16e42db7c",
"timestamp":1521948046003,
"tags":{
"region":"us-east-1"
}
}
2018-03-25 00:16:32 ... event:
{
"type":"thing-created",
"id":"c6f60550-2fdd-11e8-b749-0d2c43ec73d0",
...
}
2018-03-25 00:16:32 ... count: 2
END ...
REPORT ... Duration: 7.73 ms Billed Duration: 100 ms ... Max Memory Used: 22 MB
START ...
2018-03-25 00:22:22 ... event:
{
"type":"thing-created",
"id":"1c2b5150-2fe4-11e8-b749-0d2c43ec73d0",
...
}
2018-03-25 00:22:22 ... count: 1
END ...
REPORT ... Duration: 1.34 ms Billed Duration: 100 ms ... Max Memory Used: 22 MB
- Remove the stack once you are finished with npm run rm:lcl -- -s $MY_STAGE.
How it works...
Stream processors listen for data from a streaming service such as Kinesis or DynamoDB Streams. Deploying a stream processor is completely declarative. We configure a function with the stream event type and the pertinent settings, such as the type, arn, batchSize, and startingPosition. The arn is set dynamically using a CloudFormation variable, ${cf:cncb-event-stream-${opt:stage}.streamArn}, that references the output value of the cnbc-event-stream stack.
Streams are the only resources that are shared between autonomous cloud-native services.
We will discuss batch size and starting position in detail in both Chapter 8, Designing for Failure, and Chapter 9, Optimizing Performance. For now, you may have noticed that the new stream processor logged all the events that were published to the stream in the last 24 hours. This is because the startingPosition is set to TRIM_HORIZON. If it was set to LATEST, then it would only receive events that were published after the function was created.
Stream processing is a perfect match for functional reactive programming with Node.js streams. The terminology can be a little confusing because the word stream is overloaded. I like to think of streams as either macro or micro. For example, Kinesis is the macro stream and the code in our stream processor function is the micro stream. My favorite library for implementing the micro stream is Highland.js (https://highlandjs.org). A popular alternative is RxJS (https://rxjs-dev.firebaseapp.com). As you can see in this recipe, functional reactive programming is very descriptive and readable. One of the reasons for this is that there are no loops. If you try to implement a stream processor with imperative programming, you will find that it quickly gets very messy. You also lose backpressure, which we will discuss in Chapter 8, Designing for Failure.
The code in the listener function creates a pipeline of steps that the data from the Kinesis stream will ultimately flow through. The first step, _(event.Records), converts the array of Kinesis records into a Highland.js stream object that will allow each element in the array to be pulled through the stream in turn as the downstream steps are ready to receive the next element. The .map(recordToEvent) step decodes the Base64 encoded data from the Kinesis record and parses the JSON into an event object. The next step, .tap(printEvent), simply logs the event so that we can see what is happening in the recipe.
Kinesis and event streaming, in general, is a member of the high performance, dumb-pipe-smart-endpoints generation of messaging middleware. This means that Kinesis, the dumb pipe, does not waste its processing power on filtering data for the endpoints. Instead, all that logic is spread out across the processing power of the smart endpoints. Our stream processor function is the smart endpoint. To that end, the .filter(forThingCreated) step is responsible for filtering out the events that the processor is not interested in. All the remaining steps can assume that they are receiving the expected event types.
Our bare-boned stream processor needs something somewhat interesting but simple to do. So, we count and print the number of thing-created events in the batch. We have filtered out all other event types, so the .collect() step collects all the remaining events into an array. Then, the .tap(printCount) step logs the length of the array. Finally, the .toCallback(cb) step will invoke the callback function once all the data in the batch has been processed. At this point, the Kinesis checkpoint is advanced and the next batch of events is processed. We will cover error handling and how it relates to batches and checkpoints in Chapter 8, Designing for Failure.