feat(AWS Lambda): Add support for self-managed kafka event

This commit is contained in:
lewgordon 2021-01-25 15:41:51 -05:00 committed by GitHub
parent cd5a739265
commit ff605018a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 761 additions and 0 deletions

View File

@ -0,0 +1,86 @@
<!--
title: Serverless Framework - AWS Lambda Events - Self-managed Apache Kafka
menuText: Kafka
description: Setting up AWS self-managed Apache Kafka Events with AWS Lambda via the Serverless Framework
layout: Doc
-->
<!-- DOCS-SITE-LINK:START automatically generated -->
### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/kafka)
<!-- DOCS-SITE-LINK:END -->
# Kafka
A self-managed Apache Kafka cluster can be used as an event source for AWS Lambda.
## Simple event definition
In the following example, we specify that the `compute` function should be triggered whenever there are new messages available to consume from defined Kafka `topic`.
In order to configure `kafka` event, you have to provide three required properties:
- `accessConfigurations`, which is either secret credentials required to do [SASL_SCRAM auth](https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html), or this is VPC configuration to allow Lambda to connect to your cluster
- `topic` to consume messages from.
- `bootstrapServers` an array of bootstrap server addresses for your Kafka cluster
```yml
functions:
compute:
handler: handler.compute
events:
- kafka:
accessConfigurations:
saslScram512Auth: arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName
topic: AWSKafkaTopic
bootstrapServers:
- abc3.xyz.com:9092
- abc2.xyz.com:9092
```
## Using VPC configurations
You can also specify VPC configurations for your event source. The values will be automatically transformed into their corresponding URI values, so it not required to specify the URI prefix. For example, `subnet-0011001100` will be automatically mapped to the value `subnet:subnet-0011001100`.
```yml
functions:
compute:
handler: handler.compute
events:
- kafka:
accessConfigurations:
vpcSubnet:
- subnet-0011001100
- subnet-0022002200
vpcSecurityGroup: sg-0123456789
topic: mytopic
bootstrapServers:
- abc3.xyz.com:9092
- abc2.xyz.com:9092
```
## Enabling and disabling Kafka event
The `kafka` event also supports `enabled` parameter, which is used to control if the event source mapping is active. Setting it to `false` will pause polling for and processing new messages.
In the following example, we specify that the `compute` function's `kafka` event should be disabled.
```yml
functions:
compute:
handler: handler.compute
events:
- kafka:
accessConfigurations:
saslScram512Auth: arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName
topic: AWSKafkaTopic
bootstrapServers:
- abc3.xyz.com:9092
- abc2.xyz.com:9092
enabled: false
```
## IAM Permissions
The Serverless Framework will automatically configure the most minimal set of IAM permissions for you. However you can still add additional permissions if you need to. Read the official [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) for more information about IAM Permissions for Kafka events.

View File

@ -422,6 +422,14 @@ module.exports = {
return `${normalizedFunctionName}EventSourceMappingMSK${normalizedClusterName}${normalizedTopicName}`;
},
// Kafka
getKafkaEventLogicalId(functionName, topicName) {
const normalizedFunctionName = this.getNormalizedFunctionName(functionName);
// TopicName is trimmed to 158 chars to avoid going over 255 character limit
const normalizedTopicName = this.normalizeNameToAlphaNumericOnly(topicName).slice(0, 158);
return `${normalizedFunctionName}EventSourceMappingKafka${normalizedTopicName}`;
},
// ALB
getAlbTargetGroupLogicalId(functionName, albId, multiValueHeaders) {
return `${this.getNormalizedFunctionName(functionName)}Alb${

View File

@ -0,0 +1,213 @@
'use strict';
class AwsCompileKafkaEvents {
constructor(serverless) {
this.serverless = serverless;
this.provider = this.serverless.getProvider('aws');
this.hooks = {
'package:compileEvents': this.compileKafkaEvents.bind(this),
};
const secretsManagerArnRegex =
'arn:[a-z-]+:secretsmanager:[a-z0-9-]+:\\d+:secret:[A-Za-z0-9/_+=.@-]+';
this.serverless.configSchemaHandler.defineFunctionEvent('aws', 'kafka', {
type: 'object',
properties: {
accessConfigurations: {
type: 'object',
minProperties: 1,
properties: {
vpcSubnet: {
type: 'array',
minItems: 1,
items: {
type: 'string',
pattern: 'subnet-[a-z0-9]+',
},
},
vpcSecurityGroup: {
type: 'array',
minItems: 1,
items: {
type: 'string',
pattern: 'sg-[a-z0-9]+',
},
},
saslScram256Auth: {
type: 'array',
minItems: 1,
items: {
type: 'string',
pattern: secretsManagerArnRegex,
},
},
saslScram512Auth: {
type: 'array',
minItems: 1,
items: {
type: 'string',
pattern: secretsManagerArnRegex,
},
},
},
additionalProperties: false,
},
batchSize: {
type: 'number',
minimum: 1,
maximum: 10000,
},
enabled: {
type: 'boolean',
},
bootstrapServers: {
type: 'array',
minItems: 1,
items: {
type: 'string',
},
},
startingPosition: {
type: 'string',
enum: ['LATEST', 'TRIM_HORIZON'],
},
topic: {
type: 'string',
},
},
additionalProperties: false,
required: ['accessConfigurations', 'bootstrapServers', 'topic'],
});
}
compileKafkaEvents() {
this.serverless.service.getAllFunctions().forEach((functionName) => {
const functionObj = this.serverless.service.getFunction(functionName);
const cfTemplate = this.serverless.service.provider.compiledCloudFormationTemplate;
// It is required to add the following statement in order to be able to connect to Kafka cluster
const ec2Statement = {
Effect: 'Allow',
Action: [
'ec2:CreateNetworkInterface',
'ec2:DescribeNetworkInterfaces',
'ec2:DescribeVpcs',
'ec2:DeleteNetworkInterface',
'ec2:DescribeSubnets',
'ec2:DescribeSecurityGroups',
],
Resource: '*',
};
// The omission of kms:Decrypt is intentional, since we won't know
// which resources should be valid to decrypt. It's also probably
// not best practice to allow '*' for this.
const secretsManagerStatement = {
Effect: 'Allow',
Action: ['secretsmanager:GetSecretValue'],
Resource: [],
};
let hasKafkaEvent = false;
let needsEc2Permissions = false;
functionObj.events.forEach((event) => {
if (!event.kafka) return;
hasKafkaEvent = true;
const { topic, batchSize, enabled } = event.kafka;
const startingPosition = event.kafka.startingPosition || 'TRIM_HORIZON';
const kafkaEventLogicalId = this.provider.naming.getKafkaEventLogicalId(
functionName,
topic
);
const lambdaLogicalId = this.provider.naming.getLambdaLogicalId(functionName);
const dependsOn = this.provider.resolveFunctionIamRoleResourceName(functionObj) || [];
const kafkaResource = {
Type: 'AWS::Lambda::EventSourceMapping',
DependsOn: dependsOn,
Properties: {
FunctionName: {
'Fn::GetAtt': [lambdaLogicalId, 'Arn'],
},
StartingPosition: startingPosition,
SelfManagedEventSource: {
Endpoints: { KafkaBootstrapServers: event.kafka.bootstrapServers },
},
Topics: [topic],
},
};
kafkaResource.Properties.SourceAccessConfigurations = [];
Object.entries(event.kafka.accessConfigurations).forEach(
([accessConfigurationType, accessConfigurationValues]) => {
let type;
let prefix = '';
let needsSecretsManagerPermissions = false;
switch (accessConfigurationType) {
case 'vpcSubnet':
type = 'VPC_SUBNET';
prefix = 'subnet:';
needsEc2Permissions = true;
break;
case 'vpcSecurityGroup':
type = 'VPC_SECURITY_GROUP';
prefix = 'security_group:';
needsEc2Permissions = true;
break;
case 'saslScram256Auth':
type = 'SASL_SCRAM_256_AUTH';
needsSecretsManagerPermissions = true;
break;
case 'saslScram512Auth':
type = 'SASL_SCRAM_512_AUTH';
needsSecretsManagerPermissions = true;
break;
default:
type = accessConfigurationType;
}
accessConfigurationValues.forEach((accessConfigurationValue) => {
if (needsSecretsManagerPermissions) {
secretsManagerStatement.Resource.push(accessConfigurationValue);
}
kafkaResource.Properties.SourceAccessConfigurations.push({
Type: type,
URI: `${prefix}${accessConfigurationValue}`,
});
});
}
);
if (batchSize) {
kafkaResource.Properties.BatchSize = batchSize;
}
if (enabled != null) {
kafkaResource.Properties.Enabled = enabled;
}
cfTemplate.Resources[kafkaEventLogicalId] = kafkaResource;
});
// https://docs.aws.amazon.com/lambda/latest/dg/smaa-permissions.html
if (cfTemplate.Resources.IamRoleLambdaExecution && hasKafkaEvent) {
const statement =
cfTemplate.Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument
.Statement;
if (secretsManagerStatement.Resource.length) {
statement.push(secretsManagerStatement);
}
if (needsEc2Permissions) {
statement.push(ec2Statement);
}
}
});
}
}
module.exports = AwsCompileKafkaEvents;

View File

@ -40,6 +40,7 @@ module.exports = [
require('./aws/package/compile/events/websockets/index.js'),
require('./aws/package/compile/events/sns.js'),
require('./aws/package/compile/events/stream.js'),
require('./aws/package/compile/events/kafka.js'),
require('./aws/package/compile/events/msk/index.js'),
require('./aws/package/compile/events/alb/index.js'),
require('./aws/package/compile/events/alexaSkill.js'),

View File

@ -727,6 +727,25 @@ describe('#naming()', () => {
});
});
describe('#getKafkaEventLogicalId()', () => {
it('should normalize the function name and append topic name', () => {
expect(sdk.naming.getKafkaEventLogicalId('functionName', 'kafka-topic')).to.equal(
'FunctionNameEventSourceMappingKafkaKafkatopic'
);
});
it('should normalize long function name and append topic name', () => {
expect(
sdk.naming.getKafkaEventLogicalId(
'functionName',
'myVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicName'
)
).to.equal(
'FunctionNameEventSourceMappingKafkaMyVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicNamemyVeryLongTopicNamemyVery'
);
});
});
describe('#getMSKEventLogicalId()', () => {
it('should normalize the function name and append normalized cluster and topic names', () => {
expect(

View File

@ -0,0 +1,434 @@
'use strict';
const chai = require('chai');
const runServerless = require('../../../../../../../utils/run-serverless');
const { expect } = chai;
chai.use(require('chai-as-promised'));
describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () => {
const saslScram256AuthArn =
'arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName';
const topic = 'TestingTopic';
const enabled = false;
const startingPosition = 'LATEST';
const batchSize = 5000;
describe('when there are kafka events defined', () => {
let minimalEventSourceMappingResource;
let allParamsEventSourceMappingResource;
let defaultIamRole;
let naming;
before(async () => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
configExt: {
functions: {
foo: {
events: [
{
kafka: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
},
},
],
},
other: {
events: [
{
kafka: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
batchSize,
enabled,
startingPosition,
},
},
],
},
},
},
cliArgs: ['package'],
});
naming = awsNaming;
minimalEventSourceMappingResource =
cfTemplate.Resources[naming.getKafkaEventLogicalId('foo', 'TestingTopic')];
allParamsEventSourceMappingResource =
cfTemplate.Resources[naming.getKafkaEventLogicalId('other', 'TestingTopic')];
defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution;
});
it('should correctly compile EventSourceMapping resource properties with minimal configuration', () => {
expect(minimalEventSourceMappingResource.Properties).to.deep.equal({
SelfManagedEventSource: {
Endpoints: {
KafkaBootstrapServers: ['abc.xyz:9092'],
},
},
SourceAccessConfigurations: [
{
Type: 'SASL_SCRAM_256_AUTH',
URI: saslScram256AuthArn,
},
],
StartingPosition: 'TRIM_HORIZON',
Topics: [topic],
FunctionName: {
'Fn::GetAtt': [naming.getLambdaLogicalId('foo'), 'Arn'],
},
});
});
it('should update default IAM role with SecretsManager statement', () => {
expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).to.deep.include({
Effect: 'Allow',
Action: ['secretsmanager:GetSecretValue'],
Resource: [saslScram256AuthArn],
});
});
it('should correctly compile EventSourceMapping resource DependsOn ', () => {
expect(minimalEventSourceMappingResource.DependsOn).to.equal('IamRoleLambdaExecution');
expect(allParamsEventSourceMappingResource.DependsOn).to.equal('IamRoleLambdaExecution');
});
it('should correctly compile EventSourceMapping resource with all parameters', () => {
expect(allParamsEventSourceMappingResource.Properties).to.deep.equal({
BatchSize: batchSize,
Enabled: enabled,
SelfManagedEventSource: {
Endpoints: {
KafkaBootstrapServers: ['abc.xyz:9092'],
},
},
SourceAccessConfigurations: [
{
Type: 'SASL_SCRAM_256_AUTH',
URI: saslScram256AuthArn,
},
],
StartingPosition: startingPosition,
Topics: [topic],
FunctionName: {
'Fn::GetAtt': [naming.getLambdaLogicalId('other'), 'Arn'],
},
});
});
});
describe('configuring kafka events', () => {
const runCompileEventSourceMappingTest = async (eventConfig) => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
configExt: {
functions: {
foo: {
events: [
{
kafka: eventConfig.event,
},
],
},
},
},
cliArgs: ['package'],
});
const eventSourceMappingResource =
cfTemplate.Resources[awsNaming.getKafkaEventLogicalId('foo', 'TestingTopic')];
expect(eventSourceMappingResource.Properties).to.deep.equal(eventConfig.resource(awsNaming));
};
it('should correctly compile EventSourceMapping resource properties for VPC_SECURITY_GROUP', async () => {
const eventConfig = {
event: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { vpcSecurityGroup: 'sg-abc4567890' },
},
resource: (awsNaming) => {
return {
SelfManagedEventSource: {
Endpoints: {
KafkaBootstrapServers: ['abc.xyz:9092'],
},
},
SourceAccessConfigurations: [
{
Type: 'VPC_SECURITY_GROUP',
URI: 'security_group:sg-abc4567890',
},
],
StartingPosition: 'TRIM_HORIZON',
Topics: [topic],
FunctionName: {
'Fn::GetAtt': [awsNaming.getLambdaLogicalId('foo'), 'Arn'],
},
};
},
};
await runCompileEventSourceMappingTest(eventConfig);
});
it('should correctly compile EventSourceMapping resource properties for VPC_SUBNET', async () => {
const eventConfig = {
event: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { vpcSubnet: 'subnet-abc4567890' },
},
resource: (awsNaming) => {
return {
SelfManagedEventSource: {
Endpoints: {
KafkaBootstrapServers: ['abc.xyz:9092'],
},
},
SourceAccessConfigurations: [
{
Type: 'VPC_SUBNET',
URI: 'subnet:subnet-abc4567890',
},
],
StartingPosition: 'TRIM_HORIZON',
Topics: [topic],
FunctionName: {
'Fn::GetAtt': [awsNaming.getLambdaLogicalId('foo'), 'Arn'],
},
};
},
};
await runCompileEventSourceMappingTest(eventConfig);
});
it('should correctly compile EventSourceMapping resource properties for multiple VPC_SUBNETs', async () => {
const eventConfig = {
event: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { vpcSubnet: ['subnet-0011001100', 'subnet-0022002200'] },
},
resource: (awsNaming) => {
return {
SelfManagedEventSource: {
Endpoints: {
KafkaBootstrapServers: ['abc.xyz:9092'],
},
},
SourceAccessConfigurations: [
{
Type: 'VPC_SUBNET',
URI: 'subnet:subnet-0011001100',
},
{
Type: 'VPC_SUBNET',
URI: 'subnet:subnet-0022002200',
},
],
StartingPosition: 'TRIM_HORIZON',
Topics: [topic],
FunctionName: {
'Fn::GetAtt': [awsNaming.getLambdaLogicalId('foo'), 'Arn'],
},
};
},
};
await runCompileEventSourceMappingTest(eventConfig);
});
it('should correctly compile EventSourceMapping resource properties for SASL_SCRAM_256_AUTH', async () => {
const eventConfig = {
event: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
},
resource: (awsNaming) => {
return {
SelfManagedEventSource: {
Endpoints: {
KafkaBootstrapServers: ['abc.xyz:9092'],
},
},
SourceAccessConfigurations: [
{
Type: 'SASL_SCRAM_256_AUTH',
URI: saslScram256AuthArn,
},
],
StartingPosition: 'TRIM_HORIZON',
Topics: [topic],
FunctionName: {
'Fn::GetAtt': [awsNaming.getLambdaLogicalId('foo'), 'Arn'],
},
};
},
};
await runCompileEventSourceMappingTest(eventConfig);
});
it('should correctly compile EventSourceMapping resource properties for SASL_SCRAM_256_AUTH', async () => {
const eventConfig = {
event: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: {
saslScram256Auth:
'arn:aws:secretsmanager:us-east-1:01234567890:secret:SaslScram256SecretName',
},
},
resource: (awsNaming) => {
return {
SelfManagedEventSource: {
Endpoints: {
KafkaBootstrapServers: ['abc.xyz:9092'],
},
},
SourceAccessConfigurations: [
{
Type: 'SASL_SCRAM_256_AUTH',
URI: 'arn:aws:secretsmanager:us-east-1:01234567890:secret:SaslScram256SecretName',
},
],
StartingPosition: 'TRIM_HORIZON',
Topics: [topic],
FunctionName: {
'Fn::GetAtt': [awsNaming.getLambdaLogicalId('foo'), 'Arn'],
},
};
},
};
await runCompileEventSourceMappingTest(eventConfig);
});
it('should correctly compile EventSourceMapping resource properties for SASL_SCRAM_512_AUTH', async () => {
const eventConfig = {
event: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: {
saslScram512Auth:
'arn:aws:secretsmanager:us-east-1:01234567890:secret:SaslScram512SecretName',
},
},
resource: (awsNaming) => {
return {
SelfManagedEventSource: {
Endpoints: {
KafkaBootstrapServers: ['abc.xyz:9092'],
},
},
SourceAccessConfigurations: [
{
Type: 'SASL_SCRAM_512_AUTH',
URI: 'arn:aws:secretsmanager:us-east-1:01234567890:secret:SaslScram512SecretName',
},
],
StartingPosition: 'TRIM_HORIZON',
Topics: [topic],
FunctionName: {
'Fn::GetAtt': [awsNaming.getLambdaLogicalId('foo'), 'Arn'],
},
};
},
};
await runCompileEventSourceMappingTest(eventConfig);
});
it('should update default IAM role with EC2 statement', async () => {
const { cfTemplate } = await runServerless({
fixture: 'function',
configExt: {
functions: {
foo: {
events: [
{
kafka: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { vpcSecurityGroup: 'sg-abc4567890' },
},
},
],
},
},
},
cliArgs: ['package'],
});
const defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution;
expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).to.deep.include({
Effect: 'Allow',
Action: [
'ec2:CreateNetworkInterface',
'ec2:DescribeNetworkInterfaces',
'ec2:DescribeVpcs',
'ec2:DeleteNetworkInterface',
'ec2:DescribeSubnets',
'ec2:DescribeSecurityGroups',
],
Resource: '*',
});
});
it('should not add dependsOn for imported role', async () => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
configExt: {
functions: {
foo: {
role: { 'Fn::ImportValue': 'MyImportedRole' },
events: [
{
kafka: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
},
},
],
},
},
},
cliArgs: ['package'],
});
const eventSourceMappingResource =
cfTemplate.Resources[awsNaming.getKafkaEventLogicalId('foo', 'TestingTopic')];
expect(eventSourceMappingResource.DependsOn).to.deep.equal([]);
});
});
describe('when no kafka events are defined', () => {
it('should not modify the default IAM role', async () => {
const { cfTemplate } = await runServerless({
fixture: 'function',
cliArgs: ['package'],
});
const defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution;
expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({
Effect: 'Allow',
Action: ['secretsmanager:GetSecretValue'],
Resource: [saslScram256AuthArn],
});
expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({
Effect: 'Allow',
Action: [
'ec2:CreateNetworkInterface',
'ec2:DescribeNetworkInterfaces',
'ec2:DescribeVpcs',
'ec2:DeleteNetworkInterface',
'ec2:DescribeSubnets',
'ec2:DescribeSecurityGroups',
],
Resource: '*',
});
});
});
});