mirror of
https://github.com/serverless/serverless.git
synced 2026-01-25 15:07:39 +00:00
194 lines
6.1 KiB
JavaScript
194 lines
6.1 KiB
JavaScript
import getMskClusterNameToken from './get-msk-cluster-name-token.js'
|
|
import resolveLambdaTarget from '../../../../utils/resolve-lambda-target.js'
|
|
import ServerlessError from '../../../../../../serverless-error.js'
|
|
import _ from 'lodash'
|
|
|
|
class AwsCompileMSKEvents {
|
|
constructor(serverless) {
|
|
this.serverless = serverless
|
|
this.provider = this.serverless.getProvider('aws')
|
|
|
|
this.hooks = {
|
|
'package:compileEvents': async () => this.compileMSKEvents(),
|
|
}
|
|
|
|
this.serverless.configSchemaHandler.defineFunctionEvent('aws', 'msk', {
|
|
type: 'object',
|
|
properties: {
|
|
arn: {
|
|
anyOf: [
|
|
{ $ref: '#/definitions/awsArnString' },
|
|
{ $ref: '#/definitions/awsCfImport' },
|
|
{ $ref: '#/definitions/awsCfRef' },
|
|
],
|
|
},
|
|
batchSize: {
|
|
type: 'number',
|
|
minimum: 1,
|
|
maximum: 10000,
|
|
},
|
|
maximumBatchingWindow: {
|
|
type: 'number',
|
|
minimum: 0,
|
|
maximum: 300,
|
|
},
|
|
enabled: {
|
|
type: 'boolean',
|
|
},
|
|
startingPosition: {
|
|
type: 'string',
|
|
enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'],
|
|
},
|
|
startingPositionTimestamp: {
|
|
type: 'number',
|
|
},
|
|
topic: {
|
|
type: 'string',
|
|
},
|
|
saslScram512: { $ref: '#/definitions/awsArnString' },
|
|
consumerGroupId: {
|
|
type: 'string',
|
|
maxLength: 200,
|
|
pattern: '[a-zA-Z0-9-/*:_+=.@-]*',
|
|
},
|
|
filterPatterns: { $ref: '#/definitions/filterPatterns' },
|
|
},
|
|
additionalProperties: false,
|
|
required: ['arn', 'topic'],
|
|
})
|
|
}
|
|
|
|
compileMSKEvents() {
|
|
this.serverless.service.getAllFunctions().forEach((functionName) => {
|
|
const functionObj = this.serverless.service.getFunction(functionName)
|
|
const cfTemplate =
|
|
this.serverless.service.provider.compiledCloudFormationTemplate
|
|
|
|
// It is required to add the following statement in order to be able to connect to MSK cluster
|
|
const ec2Statement = {
|
|
Effect: 'Allow',
|
|
Action: [
|
|
'ec2:CreateNetworkInterface',
|
|
'ec2:DescribeNetworkInterfaces',
|
|
'ec2:DescribeVpcs',
|
|
'ec2:DeleteNetworkInterface',
|
|
'ec2:DescribeSubnets',
|
|
'ec2:DescribeSecurityGroups',
|
|
],
|
|
Resource: '*',
|
|
}
|
|
const mskStatement = {
|
|
Effect: 'Allow',
|
|
Action: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers'],
|
|
Resource: [],
|
|
}
|
|
|
|
functionObj.events.forEach((event) => {
|
|
if (event.msk) {
|
|
const eventSourceArn = event.msk.arn
|
|
const topic = event.msk.topic
|
|
const batchSize = event.msk.batchSize
|
|
const maximumBatchingWindow = event.msk.maximumBatchingWindow
|
|
const enabled = event.msk.enabled
|
|
const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON'
|
|
const startingPositionTimestamp = event.msk.startingPositionTimestamp
|
|
if (
|
|
startingPosition === 'AT_TIMESTAMP' &&
|
|
startingPositionTimestamp == null
|
|
) {
|
|
throw new ServerlessError(
|
|
`You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP.`,
|
|
'FUNCTION_MSK_STARTING_POSITION_TIMESTAMP_INVALID',
|
|
)
|
|
}
|
|
const saslScram512 = event.msk.saslScram512
|
|
const consumerGroupId = event.msk.consumerGroupId
|
|
const filterPatterns = event.msk.filterPatterns
|
|
|
|
const mskClusterNameToken = getMskClusterNameToken(eventSourceArn)
|
|
const mskEventLogicalId = this.provider.naming.getMSKEventLogicalId(
|
|
functionName,
|
|
mskClusterNameToken,
|
|
topic,
|
|
)
|
|
|
|
const dependsOn = [
|
|
this.provider.resolveFunctionIamRoleResourceName(functionObj),
|
|
_.get(functionObj.targetAlias, 'logicalId'),
|
|
].filter(Boolean)
|
|
|
|
const mskResource = {
|
|
Type: 'AWS::Lambda::EventSourceMapping',
|
|
DependsOn: dependsOn,
|
|
Properties: {
|
|
EventSourceArn: eventSourceArn,
|
|
FunctionName: resolveLambdaTarget(functionName, functionObj),
|
|
StartingPosition: startingPosition,
|
|
Topics: [topic],
|
|
},
|
|
}
|
|
|
|
if (startingPositionTimestamp != null) {
|
|
mskResource.Properties.StartingPositionTimestamp =
|
|
startingPositionTimestamp
|
|
}
|
|
|
|
if (batchSize) {
|
|
mskResource.Properties.BatchSize = batchSize
|
|
}
|
|
|
|
if (maximumBatchingWindow) {
|
|
mskResource.Properties.MaximumBatchingWindowInSeconds =
|
|
maximumBatchingWindow
|
|
}
|
|
|
|
if (consumerGroupId) {
|
|
mskResource.Properties.AmazonManagedKafkaEventSourceConfig = {
|
|
ConsumerGroupId: consumerGroupId,
|
|
}
|
|
}
|
|
|
|
if (enabled != null) {
|
|
mskResource.Properties.Enabled = enabled
|
|
}
|
|
|
|
if (saslScram512 != null) {
|
|
const secureAccessConfigurations = [
|
|
{
|
|
Type: 'SASL_SCRAM_512_AUTH',
|
|
URI: saslScram512,
|
|
},
|
|
]
|
|
mskResource.Properties.SourceAccessConfigurations =
|
|
secureAccessConfigurations
|
|
}
|
|
|
|
if (filterPatterns) {
|
|
mskResource.Properties.FilterCriteria = {
|
|
Filters: filterPatterns.map((pattern) => ({
|
|
Pattern: JSON.stringify(pattern),
|
|
})),
|
|
}
|
|
}
|
|
|
|
mskStatement.Resource.push(eventSourceArn)
|
|
|
|
cfTemplate.Resources[mskEventLogicalId] = mskResource
|
|
}
|
|
})
|
|
|
|
if (cfTemplate.Resources.IamRoleLambdaExecution) {
|
|
const statement =
|
|
cfTemplate.Resources.IamRoleLambdaExecution.Properties.Policies[0]
|
|
.PolicyDocument.Statement
|
|
if (mskStatement.Resource.length) {
|
|
statement.push(mskStatement)
|
|
statement.push(ec2Statement)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
export default AwsCompileMSKEvents
|