mirror of
https://github.com/serverless/serverless.git
synced 2025-12-08 19:46:03 +00:00
428 lines
15 KiB
JavaScript
428 lines
15 KiB
JavaScript
import _ from 'lodash'
|
|
import resolveLambdaTarget from '../../../utils/resolve-lambda-target.js'
|
|
import ServerlessError from '../../../../../serverless-error.js'
|
|
|
|
class AwsCompileStreamEvents {
|
|
constructor(serverless) {
|
|
this.serverless = serverless
|
|
this.provider = this.serverless.getProvider('aws')
|
|
|
|
this.hooks = {
|
|
initialize: () => {
|
|
if (
|
|
Object.values(this.serverless.service.functions).some(({ events }) =>
|
|
events.some(({ stream: eventObject }) => {
|
|
const consumer = eventObject && eventObject.consumer
|
|
const usesServiceSpecificNamingMode =
|
|
this.serverless.service.provider.kinesis &&
|
|
this.serverless.service.provider.kinesis.consumerNamingMode &&
|
|
_.get(
|
|
this.serverless,
|
|
'service.provider.kinesis.consumerNamingMode',
|
|
) === 'serviceSpecific'
|
|
return consumer && !usesServiceSpecificNamingMode
|
|
}),
|
|
)
|
|
) {
|
|
this.serverless._logDeprecation(
|
|
'KINESIS_CONSUMER_NAME_CONTAINING_SERVICE',
|
|
'Starting with v4.0.0 Kinesis streams naming scheme will change. Adapt to new schema by setting`provider.kinesis.consumerNamingMode` property to `serviceSpecific`.',
|
|
)
|
|
}
|
|
},
|
|
'package:compileEvents': async () => this.compileStreamEvents(),
|
|
}
|
|
|
|
this.serverless.configSchemaHandler.defineFunctionEvent('aws', 'stream', {
|
|
anyOf: [
|
|
{ $ref: '#/definitions/awsArnString' },
|
|
{
|
|
type: 'object',
|
|
properties: {
|
|
// arn constraints are listed in oneOf property of this schema
|
|
arn: {},
|
|
type: { enum: ['dynamodb', 'kinesis'] },
|
|
batchSize: { type: 'integer', minimum: 1, maximum: 10000 },
|
|
parallelizationFactor: { type: 'integer', minimum: 1, maximum: 10 },
|
|
startingPosition: {
|
|
enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'],
|
|
},
|
|
startingPositionTimestamp: { type: 'number' },
|
|
enabled: { type: 'boolean' },
|
|
consumer: {
|
|
anyOf: [{ const: true }, { $ref: '#/definitions/awsArn' }],
|
|
},
|
|
batchWindow: { type: 'integer', minimum: 0, maximum: 300 },
|
|
maximumRetryAttempts: {
|
|
type: 'integer',
|
|
minimum: -1,
|
|
maximum: 10000,
|
|
},
|
|
bisectBatchOnFunctionError: { type: 'boolean' },
|
|
maximumRecordAgeInSeconds: {
|
|
anyOf: [
|
|
{ const: -1 },
|
|
{ type: 'integer', minimum: 60, maximum: 604800 },
|
|
],
|
|
},
|
|
functionResponseType: { enum: ['ReportBatchItemFailures'] },
|
|
destinations: {
|
|
type: 'object',
|
|
properties: {
|
|
onFailure: {
|
|
anyOf: [
|
|
{ $ref: '#/definitions/awsArnString' },
|
|
{
|
|
type: 'object',
|
|
properties: {
|
|
// arn constraints are listed in oneOf property of this schema
|
|
arn: {},
|
|
type: { enum: ['sns', 'sqs'] },
|
|
},
|
|
additionalProperties: false,
|
|
anyOf: [
|
|
{
|
|
properties: {
|
|
arn: { $ref: '#/definitions/awsCfFunction' },
|
|
},
|
|
required: ['arn', 'type'],
|
|
},
|
|
{
|
|
properties: {
|
|
arn: { $ref: '#/definitions/awsArnString' },
|
|
},
|
|
required: ['arn'],
|
|
},
|
|
],
|
|
},
|
|
],
|
|
},
|
|
},
|
|
additionalProperties: false,
|
|
required: ['onFailure'],
|
|
},
|
|
tumblingWindowInSeconds: {
|
|
type: 'integer',
|
|
minimum: 0,
|
|
maximum: 900,
|
|
},
|
|
filterPatterns: { $ref: '#/definitions/filterPatterns' },
|
|
},
|
|
additionalProperties: false,
|
|
anyOf: [
|
|
{
|
|
properties: {
|
|
arn: { $ref: '#/definitions/awsCfFunction' },
|
|
},
|
|
required: ['arn', 'type'],
|
|
},
|
|
{
|
|
properties: {
|
|
arn: { $ref: '#/definitions/awsArnString' },
|
|
},
|
|
required: ['arn'],
|
|
},
|
|
],
|
|
},
|
|
],
|
|
})
|
|
}
|
|
|
|
compileStreamEvents() {
|
|
this.serverless.service.getAllFunctions().forEach((functionName) => {
|
|
const functionObj = this.serverless.service.getFunction(functionName)
|
|
|
|
if (functionObj.events) {
|
|
const dynamodbStreamStatement = {
|
|
Effect: 'Allow',
|
|
Action: [
|
|
'dynamodb:GetRecords',
|
|
'dynamodb:GetShardIterator',
|
|
'dynamodb:DescribeStream',
|
|
'dynamodb:ListStreams',
|
|
],
|
|
Resource: [],
|
|
}
|
|
const kinesisStreamStatement = {
|
|
Effect: 'Allow',
|
|
Action: [
|
|
'kinesis:GetRecords',
|
|
'kinesis:GetShardIterator',
|
|
'kinesis:DescribeStream',
|
|
'kinesis:ListStreams',
|
|
],
|
|
Resource: [],
|
|
}
|
|
const kinesisStreamWithConsumerStatement = {
|
|
Effect: 'Allow',
|
|
Action: [
|
|
'kinesis:GetRecords',
|
|
'kinesis:GetShardIterator',
|
|
'kinesis:DescribeStreamSummary',
|
|
'kinesis:ListShards',
|
|
],
|
|
Resource: [],
|
|
}
|
|
const kinesisConsumerStatement = {
|
|
Effect: 'Allow',
|
|
Action: ['kinesis:SubscribeToShard'],
|
|
Resource: [],
|
|
}
|
|
const onFailureSnsStatement = {
|
|
Effect: 'Allow',
|
|
Action: ['sns:Publish'],
|
|
Resource: [],
|
|
}
|
|
const onFailureSqsStatement = {
|
|
Effect: 'Allow',
|
|
Action: ['sqs:ListQueues', 'sqs:SendMessage'],
|
|
Resource: [],
|
|
}
|
|
|
|
functionObj.events.forEach((event) => {
|
|
if (event.stream) {
|
|
let EventSourceArn
|
|
let BatchSize = 10
|
|
let ParallelizationFactor
|
|
let StartingPosition = 'TRIM_HORIZON'
|
|
let Enabled = true
|
|
|
|
if (typeof event.stream === 'object') {
|
|
EventSourceArn = event.stream.arn
|
|
BatchSize = event.stream.batchSize || BatchSize
|
|
if (event.stream.parallelizationFactor) {
|
|
ParallelizationFactor = event.stream.parallelizationFactor
|
|
}
|
|
StartingPosition =
|
|
event.stream.startingPosition || StartingPosition
|
|
|
|
if (typeof event.stream.enabled !== 'undefined') {
|
|
Enabled = event.stream.enabled
|
|
}
|
|
} else {
|
|
EventSourceArn = event.stream
|
|
}
|
|
|
|
const streamType = event.stream.type || EventSourceArn.split(':')[2]
|
|
const streamName = (function () {
|
|
if (EventSourceArn['Fn::GetAtt']) {
|
|
return EventSourceArn['Fn::GetAtt'][0]
|
|
} else if (EventSourceArn['Fn::ImportValue']) {
|
|
return EventSourceArn['Fn::ImportValue']
|
|
} else if (EventSourceArn.Ref) {
|
|
return EventSourceArn.Ref
|
|
} else if (EventSourceArn['Fn::Join']) {
|
|
// [0] is the used delimiter, [1] is the array with values
|
|
const name = EventSourceArn['Fn::Join'][1].slice(-1).pop()
|
|
if (name.split('/').length) {
|
|
return name.split('/').pop()
|
|
}
|
|
return name
|
|
}
|
|
return EventSourceArn.split('/')[1]
|
|
})()
|
|
|
|
const streamLogicalId = this.provider.naming.getStreamLogicalId(
|
|
functionName,
|
|
streamType,
|
|
streamName,
|
|
)
|
|
|
|
const dependsOn = []
|
|
const functionIamRoleResourceName =
|
|
this.provider.resolveFunctionIamRoleResourceName(functionObj)
|
|
if (functionIamRoleResourceName) {
|
|
dependsOn.push(functionIamRoleResourceName)
|
|
}
|
|
const { targetAlias } =
|
|
this.serverless.service.functions[functionName]
|
|
if (targetAlias) {
|
|
dependsOn.push(targetAlias.logicalId)
|
|
}
|
|
|
|
const streamResource = {
|
|
Type: 'AWS::Lambda::EventSourceMapping',
|
|
DependsOn: dependsOn,
|
|
Properties: {
|
|
BatchSize,
|
|
Enabled,
|
|
EventSourceArn,
|
|
FunctionName: resolveLambdaTarget(functionName, functionObj),
|
|
ParallelizationFactor,
|
|
StartingPosition,
|
|
},
|
|
}
|
|
|
|
// add event source ARNs to PolicyDocument statements
|
|
if (streamType === 'dynamodb') {
|
|
dynamodbStreamStatement.Resource.push(EventSourceArn)
|
|
} else if (event.stream.consumer) {
|
|
kinesisStreamWithConsumerStatement.Resource.push(EventSourceArn)
|
|
} else {
|
|
kinesisStreamStatement.Resource.push(EventSourceArn)
|
|
}
|
|
|
|
if (event.stream.batchWindow != null) {
|
|
streamResource.Properties.MaximumBatchingWindowInSeconds =
|
|
event.stream.batchWindow
|
|
}
|
|
|
|
if (event.stream.maximumRetryAttempts != null) {
|
|
streamResource.Properties.MaximumRetryAttempts =
|
|
event.stream.maximumRetryAttempts
|
|
}
|
|
|
|
if (event.stream.bisectBatchOnFunctionError != null) {
|
|
streamResource.Properties.BisectBatchOnFunctionError =
|
|
event.stream.bisectBatchOnFunctionError
|
|
}
|
|
|
|
if (event.stream.functionResponseType != null) {
|
|
streamResource.Properties.FunctionResponseTypes = [
|
|
event.stream.functionResponseType,
|
|
]
|
|
}
|
|
|
|
if (event.stream.maximumRecordAgeInSeconds) {
|
|
streamResource.Properties.MaximumRecordAgeInSeconds =
|
|
event.stream.maximumRecordAgeInSeconds
|
|
}
|
|
|
|
if (event.stream.tumblingWindowInSeconds != null) {
|
|
streamResource.Properties.TumblingWindowInSeconds =
|
|
event.stream.tumblingWindowInSeconds
|
|
}
|
|
|
|
if (event.stream.destinations) {
|
|
let OnFailureDestinationArn
|
|
|
|
if (typeof event.stream.destinations.onFailure === 'object') {
|
|
OnFailureDestinationArn =
|
|
event.stream.destinations.onFailure.arn
|
|
} else {
|
|
OnFailureDestinationArn = event.stream.destinations.onFailure
|
|
}
|
|
|
|
const destinationType =
|
|
event.stream.destinations.onFailure.type ||
|
|
OnFailureDestinationArn.split(':')[2]
|
|
// add on failure destination ARNs to PolicyDocument statements
|
|
if (destinationType === 'sns') {
|
|
onFailureSnsStatement.Resource.push(OnFailureDestinationArn)
|
|
} else {
|
|
onFailureSqsStatement.Resource.push(OnFailureDestinationArn)
|
|
}
|
|
|
|
streamResource.Properties.DestinationConfig = {
|
|
OnFailure: {
|
|
Destination: OnFailureDestinationArn,
|
|
},
|
|
}
|
|
}
|
|
|
|
if (event.stream.filterPatterns) {
|
|
streamResource.Properties.FilterCriteria = {
|
|
Filters: event.stream.filterPatterns.map((pattern) => ({
|
|
Pattern: JSON.stringify(pattern),
|
|
})),
|
|
}
|
|
}
|
|
|
|
const newStreamObject = {
|
|
[streamLogicalId]: streamResource,
|
|
}
|
|
|
|
if (event.stream.consumer && streamType === 'kinesis') {
|
|
if (event.stream.consumer === true) {
|
|
const consumerName = this.provider.naming.getStreamConsumerName(
|
|
functionName,
|
|
streamName,
|
|
)
|
|
const consumerResource = {
|
|
Type: 'AWS::Kinesis::StreamConsumer',
|
|
Properties: {
|
|
StreamARN: EventSourceArn,
|
|
ConsumerName: consumerName,
|
|
},
|
|
}
|
|
const consumerLogicalId =
|
|
this.provider.naming.getStreamConsumerLogicalId(consumerName)
|
|
newStreamObject[consumerLogicalId] = consumerResource
|
|
if (Array.isArray(streamResource.DependsOn)) {
|
|
streamResource.DependsOn.push(consumerLogicalId)
|
|
} else {
|
|
streamResource.DependsOn = [
|
|
streamResource.DependsOn,
|
|
consumerLogicalId,
|
|
]
|
|
}
|
|
const consumerArnRef = {
|
|
Ref: consumerLogicalId,
|
|
}
|
|
streamResource.Properties.EventSourceArn = consumerArnRef
|
|
kinesisConsumerStatement.Resource.push(consumerArnRef)
|
|
} else {
|
|
const consumerArn = event.stream.consumer
|
|
streamResource.Properties.EventSourceArn = consumerArn
|
|
kinesisConsumerStatement.Resource.push(consumerArn)
|
|
}
|
|
|
|
if (
|
|
event.stream.startingPosition === 'AT_TIMESTAMP' &&
|
|
!event.stream.startingPositionTimestamp
|
|
) {
|
|
throw new ServerlessError(
|
|
`You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP`,
|
|
'FUNCTION_STREAM_STARTING_POSITION_TIMESTAMP_INVALID',
|
|
)
|
|
}
|
|
|
|
if (event.stream.startingPositionTimestamp) {
|
|
streamResource.Properties.StartingPositionTimestamp =
|
|
event.stream.startingPositionTimestamp
|
|
}
|
|
}
|
|
|
|
_.merge(
|
|
this.serverless.service.provider.compiledCloudFormationTemplate
|
|
.Resources,
|
|
newStreamObject,
|
|
)
|
|
}
|
|
})
|
|
|
|
// update the PolicyDocument statements (if default policy is used)
|
|
if (
|
|
this.serverless.service.provider.compiledCloudFormationTemplate
|
|
.Resources.IamRoleLambdaExecution
|
|
) {
|
|
const statement =
|
|
this.serverless.service.provider.compiledCloudFormationTemplate
|
|
.Resources.IamRoleLambdaExecution.Properties.Policies[0]
|
|
.PolicyDocument.Statement
|
|
if (dynamodbStreamStatement.Resource.length) {
|
|
statement.push(dynamodbStreamStatement)
|
|
}
|
|
if (kinesisStreamStatement.Resource.length) {
|
|
statement.push(kinesisStreamStatement)
|
|
}
|
|
if (kinesisStreamWithConsumerStatement.Resource.length) {
|
|
statement.push(kinesisStreamWithConsumerStatement)
|
|
}
|
|
if (kinesisConsumerStatement.Resource.length) {
|
|
statement.push(kinesisConsumerStatement)
|
|
}
|
|
if (onFailureSnsStatement.Resource.length) {
|
|
statement.push(onFailureSnsStatement)
|
|
}
|
|
if (onFailureSqsStatement.Resource.length) {
|
|
statement.push(onFailureSqsStatement)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
export default AwsCompileStreamEvents
|