feat: Support destinations config on stream events (#7262)

This commit is contained in:
Sydney Kereliuk 2020-01-30 14:17:11 -05:00 committed by GitHub
parent 4468805d2a
commit ea4ac262ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 677 additions and 1 deletions

View File

@ -153,6 +153,72 @@ functions:
enabled: false
```
## Setting the OnFailure destination
This configuration sets up the onFailure location for events to be sent to once it has reached the maximum number of times to retry when the function returns an error.
**Note:** Serverless only sets this property if you explicitly add it to the stream configuration (see example below).
[Related AWS documentation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-destinationconfig)
The ARN for the SNS or SQS can be specified as a string, the reference to the ARN of a resource by logical ID, or the import of an ARN that was exported by a different service or CloudFormation stack.
**Note:** The `destinationConfig` will hook up your existing SNS or SQS resources. Serverless won't create a new SNS or SQS for you.
```yml
functions:
preprocess1:
handler: handler.preprocess
events:
- stream:
arn: arn:aws:kinesis:region:XXXXXX:stream/foo
batchSize: 100
maximumRetryAttempts: 10
startingPosition: LATEST
enabled: false
destinations:
onFailure: arn:aws:sqs:region:XXXXXX:queue
preprocess2:
handler: handler.preprocess
events:
- stream:
arn: arn:aws:kinesis:region:XXXXXX:stream/foo
batchSize: 100
maximumRetryAttempts: 10
startingPosition: LATEST
enabled: false
destinations:
onFailure:
arn:
Fn::GetAtt:
- MyQueue
- Arn
type: sqs
preprocess3:
handler: handler.preprocess
events:
- stream:
arn: arn:aws:kinesis:region:XXXXXX:stream/foo
batchSize: 100
maximumRetryAttempts: 10
startingPosition: LATEST
enabled: false
destinations:
onFailure:
arn:
Fn::Join:
- ':'
- - arn
- aws
- kinesis
- Ref: AWS::Region
- Ref: AWS::AccountId
- mySnsTopic
type: sns
```
## Setting the ParallelizationFactor
The configuration below sets up a Kinesis stream event for the `preprocess` function which has a parallelization factor of 10 (default is 1).

View File

@ -12,6 +12,34 @@ class AwsCompileStreamEvents {
};
}
isValidStackImport(variable) {
if (Object.keys(variable).length !== 1) {
return false;
}
if (
variable['Fn::ImportValue'] &&
(variable['Fn::ImportValue']['Fn::GetAtt'] || variable['Fn::ImportValue'].Ref)
) {
return false;
}
const intrinsicFunctions = ['Fn::ImportValue', 'Ref', 'Fn::GetAtt', 'Fn::Sub', 'Fn::Join'];
return intrinsicFunctions.some(cfInstructionName => variable[cfInstructionName] !== undefined);
}
resolveInvalidDestinationPropertyErrorMessage(functionName, property) {
return [
`Missing or invalid ${property} property for on failure destination`,
` in function "${functionName}"`,
'The correct syntax is: ',
'destinations: ',
' onFailure: ',
' arn: resource-arn',
' type: (sns/sqs)',
'OR an object with arn and type',
'Please check the docs for more info.',
].join('\n');
}
compileStreamEvents() {
this.serverless.service.getAllFunctions().forEach(functionName => {
const functionObj = this.serverless.service.getFunction(functionName);
@ -37,6 +65,16 @@ class AwsCompileStreamEvents {
],
Resource: [],
};
const onFailureSnsStatement = {
Effect: 'Allow',
Action: ['sns:Publish'],
Resource: [],
};
const onFailureSqsStatement = {
Effect: 'Allow',
Action: ['sqs:ListQueues', 'sqs:SendMessage'],
Resource: [],
};
functionObj.events.forEach(event => {
if (event.stream) {
@ -207,6 +245,84 @@ class AwsCompileStreamEvents {
streamResource.Properties.BisectBatchOnFunctionError = true;
}
if (event.stream.destinations) {
if (event.stream.destinations.onFailure) {
let OnFailureDestinationArn;
if (typeof event.stream.destinations.onFailure === 'object') {
if (!event.stream.destinations.onFailure.arn) {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}
if (typeof event.stream.destinations.onFailure.arn !== 'string') {
if (!event.stream.destinations.onFailure.type) {
const errorMessage = [
`Missing "type" property for on failure destination in function "${functionName}"`,
' If the "arn" property on a destination is a complex type (such as Fn::GetAtt)',
' then a "type" must be provided for the destination, either "sns" or,',
' "sqs". Please check the docs for more info.',
].join('');
throw new this.serverless.classes.Error(errorMessage);
}
if (!this.isValidStackImport(event.stream.destinations.onFailure.arn)) {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}
}
if (
typeof event.stream.destinations.onFailure.arn === 'string' &&
!event.stream.destinations.onFailure.arn.startsWith('arn:')
) {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}
OnFailureDestinationArn = event.stream.destinations.onFailure.arn;
} else if (typeof event.stream.destinations.onFailure === 'string') {
if (!event.stream.destinations.onFailure.startsWith('arn:')) {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}
OnFailureDestinationArn = event.stream.destinations.onFailure;
} else {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn')
);
}
const destinationType =
event.stream.destinations.onFailure.type || OnFailureDestinationArn.split(':')[2];
// add on failure destination ARNs to PolicyDocument statements
if (destinationType === 'sns') {
onFailureSnsStatement.Resource.push(OnFailureDestinationArn);
} else if (destinationType === 'sqs') {
onFailureSqsStatement.Resource.push(OnFailureDestinationArn);
} else {
const errorMessage = [
`Stream event of function '${functionName}' had unsupported destination type of`,
` '${streamType}'. Valid stream event source types include 'sns' and`,
" 'sqs'. Please check the docs for more info.",
].join('');
throw new this.serverless.classes.Properties.Policies[0].PolicyDocument.Error(
errorMessage
);
}
streamResource.Properties.DestinationConfig = {
OnFailure: {
Destination: OnFailureDestinationArn,
},
};
} else {
throw new this.serverless.classes.Error(
this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'onFailure')
);
}
}
const newStreamObject = {
[streamLogicalId]: streamResource,
};
@ -231,6 +347,12 @@ class AwsCompileStreamEvents {
if (kinesisStreamStatement.Resource.length) {
statement.push(kinesisStreamStatement);
}
if (onFailureSnsStatement.Resource.length) {
statement.push(onFailureSnsStatement);
}
if (onFailureSqsStatement.Resource.length) {
statement.push(onFailureSqsStatement);
}
}
}
});

View File

@ -385,6 +385,14 @@ describe('AwsCompileStreamEvents', () => {
bisectBatchOnFunctionError: true,
},
},
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/fizz/stream/5',
destinations: {
onFailure: 'arn:aws:sns:region:account:snstopic',
},
},
},
],
},
};
@ -515,6 +523,40 @@ describe('AwsCompileStreamEvents', () => {
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingDynamodbBuzz.Properties.BisectBatchOnFunctionError
).to.equal(true);
// event 5
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingDynamodbFizz.Type
).to.equal('AWS::Lambda::EventSourceMapping');
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingDynamodbFizz.DependsOn
).to.equal('IamRoleLambdaExecution');
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingDynamodbFizz.Properties.EventSourceArn
).to.equal(awsCompileStreamEvents.serverless.service.functions.first.events[4].stream.arn);
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingDynamodbFizz.Properties.BatchSize
).to.equal(10);
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingDynamodbFizz.Properties.StartingPosition
).to.equal('TRIM_HORIZON');
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingDynamodbFizz.Properties.Enabled
).to.equal('True');
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingDynamodbFizz.Properties.DestinationConfig.OnFailure
.Destination
).to.equal(
awsCompileStreamEvents.serverless.service.functions.first.events[4].stream.destinations
.onFailure
);
});
it('should allow specifying DynamoDB and Kinesis streams as CFN reference types', () => {
@ -674,6 +716,181 @@ describe('AwsCompileStreamEvents', () => {
});
});
it('should allow specifying OnFailure destinations as CFN reference types', () => {
awsCompileStreamEvents.serverless.service.resources.Parameters = {
SomeSNSArn: {
Type: 'String',
},
ForeignSQSArn: {
Type: 'String',
},
};
awsCompileStreamEvents.serverless.service.functions = {
first: {
events: [
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1',
destinations: {
onFailure: {
arn: { 'Fn::GetAtt': ['SomeSNS', 'Arn'] },
type: 'sns',
},
},
},
},
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/bar/stream/1',
destinations: {
onFailure: {
arn: { 'Fn::ImportValue': 'ForeignSQS' },
type: 'sqs',
},
},
},
},
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/baz/stream/1',
destinations: {
onFailure: {
arn: {
'Fn::Join': [
':',
[
'arn',
'aws',
'sqs',
{
Ref: 'AWS::Region',
},
{
Ref: 'AWS::AccountId',
},
'MyQueue',
],
],
},
type: 'sqs',
},
},
},
},
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/buzz/stream/1',
destinations: {
onFailure: {
arn: { Ref: 'SomeSNSArn' },
type: 'sns',
},
},
},
},
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/fizz/stream/1',
destinations: {
onFailure: {
arn: { Ref: 'ForeignSQSArn' },
type: 'sqs',
},
},
},
},
],
},
};
awsCompileStreamEvents.compileStreamEvents();
// sns with Fn::GetAtt
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingDynamodbFoo.Properties.DestinationConfig.OnFailure
.Destination
).to.deep.equal({ 'Fn::GetAtt': ['SomeSNS', 'Arn'] });
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument.Statement[1]
).to.deep.equal({
Action: ['sns:Publish'],
Effect: 'Allow',
Resource: [
{
'Fn::GetAtt': ['SomeSNS', 'Arn'],
},
{
Ref: 'SomeSNSArn',
},
],
});
// sqs with Fn::ImportValue
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingDynamodbBar.Properties.DestinationConfig.OnFailure
.Destination
).to.deep.equal({ 'Fn::ImportValue': 'ForeignSQS' });
// sqs with Fn::Join
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingDynamodbBaz.Properties.DestinationConfig.OnFailure
.Destination
).to.deep.equal({
'Fn::Join': [
':',
[
'arn',
'aws',
'sqs',
{
Ref: 'AWS::Region',
},
{
Ref: 'AWS::AccountId',
},
'MyQueue',
],
],
});
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument.Statement[2]
).to.deep.equal({
Effect: 'Allow',
Action: ['sqs:ListQueues', 'sqs:SendMessage'],
Resource: [
{
'Fn::ImportValue': 'ForeignSQS',
},
{
'Fn::Join': [
':',
[
'arn',
'aws',
'sqs',
{
Ref: 'AWS::Region',
},
{
Ref: 'AWS::AccountId',
},
'MyQueue',
],
],
},
{
Ref: 'ForeignSQSArn',
},
],
});
});
it('fails if Ref/dynamic stream ARN is used without defining it to the CF parameters', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
@ -690,6 +907,27 @@ describe('AwsCompileStreamEvents', () => {
expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error);
});
it('fails if Ref/dynamic onFailure ARN is used without defining it to the CF parameters', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
events: [
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/fizz/stream/1',
destinations: {
onFailure: {
arn: { Ref: 'ForeignSQSArn' },
},
},
},
},
],
},
};
expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error);
});
it('fails if Fn::GetAtt/dynamic stream ARN is used without a type', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
@ -706,6 +944,27 @@ describe('AwsCompileStreamEvents', () => {
expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error);
});
it('fails if Fn::GetAtt/dynamic onFailure ARN is used without a type', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
events: [
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1',
destinations: {
onFailure: {
arn: { 'Fn::GetAtt': ['SomeSNS', 'Arn'] },
},
},
},
},
],
},
};
expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error);
});
it('fails if keys other than Fn::GetAtt/ImportValue/Join are used for dynamic stream ARN', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
@ -726,6 +985,183 @@ describe('AwsCompileStreamEvents', () => {
expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error);
});
it('fails if keys other than Fn::GetAtt/ImportValue/Join are used for dynamic onFailure ARN', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
events: [
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1',
destinations: {
onFailure: {
arn: {
'Fn::GetAtt': ['SomeSNS', 'Arn'],
'batchSize': 1,
},
type: 'sns',
},
},
},
},
],
},
};
expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error);
});
it('fails if Fn::ImportValue is misused for onFailure ARN', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
events: [
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1',
destinations: {
onFailure: {
arn: {
'Fn::ImportValue': {
'Fn::GetAtt': ['SomeSNS', 'Arn'],
},
},
type: 'invalidType',
},
},
},
},
],
},
};
expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error);
});
it('fails if onFailure ARN is given as a string that does not start with arn', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
events: [
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1',
destinations: {
onFailure: 'invalidARN',
},
},
},
],
},
};
expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error);
});
it('fails if onFailure ARN is given as a variable type other than string or object', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
events: [
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1',
destinations: {
onFailure: 3,
},
},
},
],
},
};
expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error);
});
it('fails if nested onFailure ARN is given as a string that does not start with arn', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
events: [
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1',
destinations: {
onFailure: {
arn: 'invalidARN',
},
},
},
},
],
},
};
expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error);
});
it('fails if no arn key is given for a dynamic onFailure ARN', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
events: [
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1',
destinations: {
onFailure: {
notarn: ['SomeSNS', 'Arn'],
},
},
},
},
],
},
};
expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error);
});
it('fails if destinations structure is wrong', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
events: [
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1',
destinations: {
notOnFailure: {
arn: {
'Fn::GetAtt': ['SomeSNS', 'Arn'],
'batchSize': 1,
},
},
},
},
},
],
},
};
expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error);
});
it('fails if invalid onFailure type is given', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
events: [
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1',
destinations: {
onFailure: {
arn: { 'Fn::GetAtt': ['SomeSNS', 'Arn'] },
type: 'invalidType',
},
},
},
},
],
},
};
expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error);
});
it('should add the necessary IAM role statements', () => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
@ -734,7 +1170,12 @@ describe('AwsCompileStreamEvents', () => {
stream: 'arn:aws:dynamodb:region:account:table/foo/stream/1',
},
{
stream: 'arn:aws:dynamodb:region:account:table/bar/stream/2',
stream: {
arn: 'arn:aws:dynamodb:region:account:table/bar/stream/2',
destinations: {
onFailure: 'arn:aws:sns:region:account:snstopic',
},
},
},
],
},
@ -754,6 +1195,11 @@ describe('AwsCompileStreamEvents', () => {
'arn:aws:dynamodb:region:account:table/bar/stream/2',
],
},
{
Effect: 'Allow',
Action: ['sns:Publish'],
Resource: ['arn:aws:sns:region:account:snstopic'],
},
];
awsCompileStreamEvents.compileStreamEvents();
@ -795,6 +1241,14 @@ describe('AwsCompileStreamEvents', () => {
bisectBatchOnFunctionError: true,
},
},
{
stream: {
arn: 'arn:aws:kinesis:region:account:table/fizz/stream/5',
destinations: {
onFailure: 'arn:aws:sns:region:account:snstopic',
},
},
},
],
},
};
@ -940,6 +1394,40 @@ describe('AwsCompileStreamEvents', () => {
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisBuzz.Properties.BisectBatchOnFunctionError
).to.equal(true);
// event 5
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisFizz.Type
).to.equal('AWS::Lambda::EventSourceMapping');
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisFizz.DependsOn
).to.equal('IamRoleLambdaExecution');
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisFizz.Properties.EventSourceArn
).to.equal(awsCompileStreamEvents.serverless.service.functions.first.events[4].stream.arn);
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisFizz.Properties.BatchSize
).to.equal(10);
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisFizz.Properties.StartingPosition
).to.equal('TRIM_HORIZON');
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisFizz.Properties.Enabled
).to.equal('True');
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisFizz.Properties.DestinationConfig.OnFailure
.Destination
).to.equal(
awsCompileStreamEvents.serverless.service.functions.first.events[4].stream.destinations
.onFailure
);
});
it('should add the necessary IAM role statements', () => {