diff --git a/docs/providers/aws/events/streams.md b/docs/providers/aws/events/streams.md index 3a04e5a5e..e971e33ba 100644 --- a/docs/providers/aws/events/streams.md +++ b/docs/providers/aws/events/streams.md @@ -153,6 +153,72 @@ functions: enabled: false ``` +## Setting the OnFailure destination + +This configuration sets up the onFailure location for events to be sent to once it has reached the maximum number of times to retry when the function returns an error. + +**Note:** Serverless only sets this property if you explicitly add it to the stream configuration (see example below). + +[Related AWS documentation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-destinationconfig) + +The ARN for the SNS or SQS can be specified as a string, the reference to the ARN of a resource by logical ID, or the import of an ARN that was exported by a different service or CloudFormation stack. + +**Note:** The `destinationConfig` will hook up your existing SNS or SQS resources. Serverless won't create a new SNS or SQS for you. + +```yml +functions: + preprocess1: + handler: handler.preprocess + events: + - stream: + arn: arn:aws:kinesis:region:XXXXXX:stream/foo + batchSize: 100 + maximumRetryAttempts: 10 + startingPosition: LATEST + enabled: false + destinations: + onFailure: arn:aws:sqs:region:XXXXXX:queue + + preprocess2: + handler: handler.preprocess + events: + - stream: + arn: arn:aws:kinesis:region:XXXXXX:stream/foo + batchSize: 100 + maximumRetryAttempts: 10 + startingPosition: LATEST + enabled: false + destinations: + onFailure: + arn: + Fn::GetAtt: + - MyQueue + - Arn + type: sqs + + preprocess3: + handler: handler.preprocess + events: + - stream: + arn: arn:aws:kinesis:region:XXXXXX:stream/foo + batchSize: 100 + maximumRetryAttempts: 10 + startingPosition: LATEST + enabled: false + destinations: + onFailure: + arn: + Fn::Join: + - ':' + - - arn + - aws + - kinesis + - Ref: AWS::Region + - Ref: AWS::AccountId + - mySnsTopic + type: sns +``` + ## Setting the ParallelizationFactor The configuration below sets up a Kinesis stream event for the `preprocess` function which has a parallelization factor of 10 (default is 1). diff --git a/lib/plugins/aws/package/compile/events/stream/index.js b/lib/plugins/aws/package/compile/events/stream/index.js index ff266ef35..d4cdd0443 100644 --- a/lib/plugins/aws/package/compile/events/stream/index.js +++ b/lib/plugins/aws/package/compile/events/stream/index.js @@ -12,6 +12,34 @@ class AwsCompileStreamEvents { }; } + isValidStackImport(variable) { + if (Object.keys(variable).length !== 1) { + return false; + } + if ( + variable['Fn::ImportValue'] && + (variable['Fn::ImportValue']['Fn::GetAtt'] || variable['Fn::ImportValue'].Ref) + ) { + return false; + } + const intrinsicFunctions = ['Fn::ImportValue', 'Ref', 'Fn::GetAtt', 'Fn::Sub', 'Fn::Join']; + return intrinsicFunctions.some(cfInstructionName => variable[cfInstructionName] !== undefined); + } + + resolveInvalidDestinationPropertyErrorMessage(functionName, property) { + return [ + `Missing or invalid ${property} property for on failure destination`, + ` in function "${functionName}"`, + 'The correct syntax is: ', + 'destinations: ', + ' onFailure: ', + ' arn: resource-arn', + ' type: (sns/sqs)', + 'OR an object with arn and type', + 'Please check the docs for more info.', + ].join('\n'); + } + compileStreamEvents() { this.serverless.service.getAllFunctions().forEach(functionName => { const functionObj = this.serverless.service.getFunction(functionName); @@ -37,6 +65,16 @@ class AwsCompileStreamEvents { ], Resource: [], }; + const onFailureSnsStatement = { + Effect: 'Allow', + Action: ['sns:Publish'], + Resource: [], + }; + const onFailureSqsStatement = { + Effect: 'Allow', + Action: ['sqs:ListQueues', 'sqs:SendMessage'], + Resource: [], + }; functionObj.events.forEach(event => { if (event.stream) { @@ -207,6 +245,84 @@ class AwsCompileStreamEvents { streamResource.Properties.BisectBatchOnFunctionError = true; } + if (event.stream.destinations) { + if (event.stream.destinations.onFailure) { + let OnFailureDestinationArn; + + if (typeof event.stream.destinations.onFailure === 'object') { + if (!event.stream.destinations.onFailure.arn) { + throw new this.serverless.classes.Error( + this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn') + ); + } + if (typeof event.stream.destinations.onFailure.arn !== 'string') { + if (!event.stream.destinations.onFailure.type) { + const errorMessage = [ + `Missing "type" property for on failure destination in function "${functionName}"`, + ' If the "arn" property on a destination is a complex type (such as Fn::GetAtt)', + ' then a "type" must be provided for the destination, either "sns" or,', + ' "sqs". Please check the docs for more info.', + ].join(''); + throw new this.serverless.classes.Error(errorMessage); + } + if (!this.isValidStackImport(event.stream.destinations.onFailure.arn)) { + throw new this.serverless.classes.Error( + this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn') + ); + } + } + if ( + typeof event.stream.destinations.onFailure.arn === 'string' && + !event.stream.destinations.onFailure.arn.startsWith('arn:') + ) { + throw new this.serverless.classes.Error( + this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn') + ); + } + OnFailureDestinationArn = event.stream.destinations.onFailure.arn; + } else if (typeof event.stream.destinations.onFailure === 'string') { + if (!event.stream.destinations.onFailure.startsWith('arn:')) { + throw new this.serverless.classes.Error( + this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn') + ); + } + OnFailureDestinationArn = event.stream.destinations.onFailure; + } else { + throw new this.serverless.classes.Error( + this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'arn') + ); + } + + const destinationType = + event.stream.destinations.onFailure.type || OnFailureDestinationArn.split(':')[2]; + // add on failure destination ARNs to PolicyDocument statements + if (destinationType === 'sns') { + onFailureSnsStatement.Resource.push(OnFailureDestinationArn); + } else if (destinationType === 'sqs') { + onFailureSqsStatement.Resource.push(OnFailureDestinationArn); + } else { + const errorMessage = [ + `Stream event of function '${functionName}' had unsupported destination type of`, + ` '${streamType}'. Valid stream event source types include 'sns' and`, + " 'sqs'. Please check the docs for more info.", + ].join(''); + throw new this.serverless.classes.Properties.Policies[0].PolicyDocument.Error( + errorMessage + ); + } + + streamResource.Properties.DestinationConfig = { + OnFailure: { + Destination: OnFailureDestinationArn, + }, + }; + } else { + throw new this.serverless.classes.Error( + this.resolveInvalidDestinationPropertyErrorMessage(functionName, 'onFailure') + ); + } + } + const newStreamObject = { [streamLogicalId]: streamResource, }; @@ -231,6 +347,12 @@ class AwsCompileStreamEvents { if (kinesisStreamStatement.Resource.length) { statement.push(kinesisStreamStatement); } + if (onFailureSnsStatement.Resource.length) { + statement.push(onFailureSnsStatement); + } + if (onFailureSqsStatement.Resource.length) { + statement.push(onFailureSqsStatement); + } } } }); diff --git a/lib/plugins/aws/package/compile/events/stream/index.test.js b/lib/plugins/aws/package/compile/events/stream/index.test.js index b61ee8a12..e22c72b14 100644 --- a/lib/plugins/aws/package/compile/events/stream/index.test.js +++ b/lib/plugins/aws/package/compile/events/stream/index.test.js @@ -385,6 +385,14 @@ describe('AwsCompileStreamEvents', () => { bisectBatchOnFunctionError: true, }, }, + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/fizz/stream/5', + destinations: { + onFailure: 'arn:aws:sns:region:account:snstopic', + }, + }, + }, ], }, }; @@ -515,6 +523,40 @@ describe('AwsCompileStreamEvents', () => { awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate .Resources.FirstEventSourceMappingDynamodbBuzz.Properties.BisectBatchOnFunctionError ).to.equal(true); + + // event 5 + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingDynamodbFizz.Type + ).to.equal('AWS::Lambda::EventSourceMapping'); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingDynamodbFizz.DependsOn + ).to.equal('IamRoleLambdaExecution'); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingDynamodbFizz.Properties.EventSourceArn + ).to.equal(awsCompileStreamEvents.serverless.service.functions.first.events[4].stream.arn); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingDynamodbFizz.Properties.BatchSize + ).to.equal(10); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingDynamodbFizz.Properties.StartingPosition + ).to.equal('TRIM_HORIZON'); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingDynamodbFizz.Properties.Enabled + ).to.equal('True'); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingDynamodbFizz.Properties.DestinationConfig.OnFailure + .Destination + ).to.equal( + awsCompileStreamEvents.serverless.service.functions.first.events[4].stream.destinations + .onFailure + ); }); it('should allow specifying DynamoDB and Kinesis streams as CFN reference types', () => { @@ -674,6 +716,181 @@ describe('AwsCompileStreamEvents', () => { }); }); + it('should allow specifying OnFailure destinations as CFN reference types', () => { + awsCompileStreamEvents.serverless.service.resources.Parameters = { + SomeSNSArn: { + Type: 'String', + }, + ForeignSQSArn: { + Type: 'String', + }, + }; + awsCompileStreamEvents.serverless.service.functions = { + first: { + events: [ + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', + destinations: { + onFailure: { + arn: { 'Fn::GetAtt': ['SomeSNS', 'Arn'] }, + type: 'sns', + }, + }, + }, + }, + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/bar/stream/1', + destinations: { + onFailure: { + arn: { 'Fn::ImportValue': 'ForeignSQS' }, + type: 'sqs', + }, + }, + }, + }, + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/baz/stream/1', + destinations: { + onFailure: { + arn: { + 'Fn::Join': [ + ':', + [ + 'arn', + 'aws', + 'sqs', + { + Ref: 'AWS::Region', + }, + { + Ref: 'AWS::AccountId', + }, + 'MyQueue', + ], + ], + }, + type: 'sqs', + }, + }, + }, + }, + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/buzz/stream/1', + destinations: { + onFailure: { + arn: { Ref: 'SomeSNSArn' }, + type: 'sns', + }, + }, + }, + }, + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/fizz/stream/1', + destinations: { + onFailure: { + arn: { Ref: 'ForeignSQSArn' }, + type: 'sqs', + }, + }, + }, + }, + ], + }, + }; + + awsCompileStreamEvents.compileStreamEvents(); + + // sns with Fn::GetAtt + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingDynamodbFoo.Properties.DestinationConfig.OnFailure + .Destination + ).to.deep.equal({ 'Fn::GetAtt': ['SomeSNS', 'Arn'] }); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument.Statement[1] + ).to.deep.equal({ + Action: ['sns:Publish'], + Effect: 'Allow', + Resource: [ + { + 'Fn::GetAtt': ['SomeSNS', 'Arn'], + }, + { + Ref: 'SomeSNSArn', + }, + ], + }); + + // sqs with Fn::ImportValue + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingDynamodbBar.Properties.DestinationConfig.OnFailure + .Destination + ).to.deep.equal({ 'Fn::ImportValue': 'ForeignSQS' }); + + // sqs with Fn::Join + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingDynamodbBaz.Properties.DestinationConfig.OnFailure + .Destination + ).to.deep.equal({ + 'Fn::Join': [ + ':', + [ + 'arn', + 'aws', + 'sqs', + { + Ref: 'AWS::Region', + }, + { + Ref: 'AWS::AccountId', + }, + 'MyQueue', + ], + ], + }); + + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument.Statement[2] + ).to.deep.equal({ + Effect: 'Allow', + Action: ['sqs:ListQueues', 'sqs:SendMessage'], + Resource: [ + { + 'Fn::ImportValue': 'ForeignSQS', + }, + { + 'Fn::Join': [ + ':', + [ + 'arn', + 'aws', + 'sqs', + { + Ref: 'AWS::Region', + }, + { + Ref: 'AWS::AccountId', + }, + 'MyQueue', + ], + ], + }, + { + Ref: 'ForeignSQSArn', + }, + ], + }); + }); + it('fails if Ref/dynamic stream ARN is used without defining it to the CF parameters', () => { awsCompileStreamEvents.serverless.service.functions = { first: { @@ -690,6 +907,27 @@ describe('AwsCompileStreamEvents', () => { expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); }); + it('fails if Ref/dynamic onFailure ARN is used without defining it to the CF parameters', () => { + awsCompileStreamEvents.serverless.service.functions = { + first: { + events: [ + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/fizz/stream/1', + destinations: { + onFailure: { + arn: { Ref: 'ForeignSQSArn' }, + }, + }, + }, + }, + ], + }, + }; + + expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); + }); + it('fails if Fn::GetAtt/dynamic stream ARN is used without a type', () => { awsCompileStreamEvents.serverless.service.functions = { first: { @@ -706,6 +944,27 @@ describe('AwsCompileStreamEvents', () => { expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); }); + it('fails if Fn::GetAtt/dynamic onFailure ARN is used without a type', () => { + awsCompileStreamEvents.serverless.service.functions = { + first: { + events: [ + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', + destinations: { + onFailure: { + arn: { 'Fn::GetAtt': ['SomeSNS', 'Arn'] }, + }, + }, + }, + }, + ], + }, + }; + + expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); + }); + it('fails if keys other than Fn::GetAtt/ImportValue/Join are used for dynamic stream ARN', () => { awsCompileStreamEvents.serverless.service.functions = { first: { @@ -726,6 +985,183 @@ describe('AwsCompileStreamEvents', () => { expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); }); + it('fails if keys other than Fn::GetAtt/ImportValue/Join are used for dynamic onFailure ARN', () => { + awsCompileStreamEvents.serverless.service.functions = { + first: { + events: [ + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', + destinations: { + onFailure: { + arn: { + 'Fn::GetAtt': ['SomeSNS', 'Arn'], + 'batchSize': 1, + }, + type: 'sns', + }, + }, + }, + }, + ], + }, + }; + + expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); + }); + + it('fails if Fn::ImportValue is misused for onFailure ARN', () => { + awsCompileStreamEvents.serverless.service.functions = { + first: { + events: [ + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', + destinations: { + onFailure: { + arn: { + 'Fn::ImportValue': { + 'Fn::GetAtt': ['SomeSNS', 'Arn'], + }, + }, + type: 'invalidType', + }, + }, + }, + }, + ], + }, + }; + + expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); + }); + + it('fails if onFailure ARN is given as a string that does not start with arn', () => { + awsCompileStreamEvents.serverless.service.functions = { + first: { + events: [ + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', + destinations: { + onFailure: 'invalidARN', + }, + }, + }, + ], + }, + }; + + expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); + }); + + it('fails if onFailure ARN is given as a variable type other than string or object', () => { + awsCompileStreamEvents.serverless.service.functions = { + first: { + events: [ + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', + destinations: { + onFailure: 3, + }, + }, + }, + ], + }, + }; + + expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); + }); + + it('fails if nested onFailure ARN is given as a string that does not start with arn', () => { + awsCompileStreamEvents.serverless.service.functions = { + first: { + events: [ + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', + destinations: { + onFailure: { + arn: 'invalidARN', + }, + }, + }, + }, + ], + }, + }; + + expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); + }); + + it('fails if no arn key is given for a dynamic onFailure ARN', () => { + awsCompileStreamEvents.serverless.service.functions = { + first: { + events: [ + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', + destinations: { + onFailure: { + notarn: ['SomeSNS', 'Arn'], + }, + }, + }, + }, + ], + }, + }; + + expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); + }); + + it('fails if destinations structure is wrong', () => { + awsCompileStreamEvents.serverless.service.functions = { + first: { + events: [ + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', + destinations: { + notOnFailure: { + arn: { + 'Fn::GetAtt': ['SomeSNS', 'Arn'], + 'batchSize': 1, + }, + }, + }, + }, + }, + ], + }, + }; + + expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); + }); + + it('fails if invalid onFailure type is given', () => { + awsCompileStreamEvents.serverless.service.functions = { + first: { + events: [ + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1', + destinations: { + onFailure: { + arn: { 'Fn::GetAtt': ['SomeSNS', 'Arn'] }, + type: 'invalidType', + }, + }, + }, + }, + ], + }, + }; + + expect(() => awsCompileStreamEvents.compileStreamEvents()).to.throw(Error); + }); + it('should add the necessary IAM role statements', () => { awsCompileStreamEvents.serverless.service.functions = { first: { @@ -734,7 +1170,12 @@ describe('AwsCompileStreamEvents', () => { stream: 'arn:aws:dynamodb:region:account:table/foo/stream/1', }, { - stream: 'arn:aws:dynamodb:region:account:table/bar/stream/2', + stream: { + arn: 'arn:aws:dynamodb:region:account:table/bar/stream/2', + destinations: { + onFailure: 'arn:aws:sns:region:account:snstopic', + }, + }, }, ], }, @@ -754,6 +1195,11 @@ describe('AwsCompileStreamEvents', () => { 'arn:aws:dynamodb:region:account:table/bar/stream/2', ], }, + { + Effect: 'Allow', + Action: ['sns:Publish'], + Resource: ['arn:aws:sns:region:account:snstopic'], + }, ]; awsCompileStreamEvents.compileStreamEvents(); @@ -795,6 +1241,14 @@ describe('AwsCompileStreamEvents', () => { bisectBatchOnFunctionError: true, }, }, + { + stream: { + arn: 'arn:aws:kinesis:region:account:table/fizz/stream/5', + destinations: { + onFailure: 'arn:aws:sns:region:account:snstopic', + }, + }, + }, ], }, }; @@ -940,6 +1394,40 @@ describe('AwsCompileStreamEvents', () => { awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate .Resources.FirstEventSourceMappingKinesisBuzz.Properties.BisectBatchOnFunctionError ).to.equal(true); + + // event 5 + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingKinesisFizz.Type + ).to.equal('AWS::Lambda::EventSourceMapping'); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingKinesisFizz.DependsOn + ).to.equal('IamRoleLambdaExecution'); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingKinesisFizz.Properties.EventSourceArn + ).to.equal(awsCompileStreamEvents.serverless.service.functions.first.events[4].stream.arn); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingKinesisFizz.Properties.BatchSize + ).to.equal(10); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingKinesisFizz.Properties.StartingPosition + ).to.equal('TRIM_HORIZON'); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingKinesisFizz.Properties.Enabled + ).to.equal('True'); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingKinesisFizz.Properties.DestinationConfig.OnFailure + .Destination + ).to.equal( + awsCompileStreamEvents.serverless.service.functions.first.events[4].stream.destinations + .onFailure + ); }); it('should add the necessary IAM role statements', () => {