feat(AWS MSK): Support AT_TIMESTAMP starting position (#12034)

This commit is contained in:
Ben 2023-06-22 12:19:32 +01:00 committed by GitHub
parent a50773b60d
commit 483ea166fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 59 additions and 4 deletions

View File

@ -47,7 +47,8 @@ functions:
For the MSK event integration, you can set the `batchSize`, which effects how many messages can be processed in a single Lambda invocation. The default `batchSize` is 100, and the max `batchSize` is 10000.
Likewise `maximumBatchingWindow` can be set to determine the amount of time the Lambda spends gathering records before invoking the function. The default is 0, but **if you set `batchSize` to more than 10, you must set `maximumBatchingWindow` to at least 1**. The maximum is 300.
In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from MSK topic. It supports two possible values, `TRIM_HORIZON` and `LATEST`, with `TRIM_HORIZON` being the default.
In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from the topic. It supports three possible values, `TRIM_HORIZON`, `LATEST` and `AT_TIMESTAMP`, with `TRIM_HORIZON` being the default.
When `startingPosition` is configured as `AT_TIMESTAMP`, `startingPositionTimestamp` is also mandatory and is specified in Unix time seconds.
In the following example, we specify that the `compute` function should have an `msk` event configured with `batchSize` of 1000, `maximumBatchingWindow` to 30 seconds and `startingPosition` equal to `LATEST`.

View File

@ -977,8 +977,10 @@ functions:
batchSize: 100
# Optional, must be in 0-300 range (seconds)
maximumBatchingWindow: 30
# Optional, can be set to LATEST or TRIM_HORIZON
# Optional, can be set to LATEST, AT_TIMESTAMP or TRIM_HORIZON
startingPosition: LATEST
# Mandatory when startingPosition is AT_TIMESTAMP, must be in Unix time seconds
startingPositionTimestamp: 10000123
# (default: true)
enabled: false
# Optional, arn of the secret key for authenticating with the brokers in your MSK cluster.

View File

@ -2,6 +2,7 @@
const getMskClusterNameToken = require('./get-msk-cluster-name-token');
const resolveLambdaTarget = require('../../../../utils/resolve-lambda-target');
const ServerlessError = require('../../../../../../serverless-error');
const _ = require('lodash');
class AwsCompileMSKEvents {
@ -38,7 +39,10 @@ class AwsCompileMSKEvents {
},
startingPosition: {
type: 'string',
enum: ['LATEST', 'TRIM_HORIZON'],
enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'],
},
startingPositionTimestamp: {
type: 'number',
},
topic: {
type: 'string',
@ -88,6 +92,13 @@ class AwsCompileMSKEvents {
const maximumBatchingWindow = event.msk.maximumBatchingWindow;
const enabled = event.msk.enabled;
const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON';
const startingPositionTimestamp = event.msk.startingPositionTimestamp;
if (startingPosition === 'AT_TIMESTAMP' && startingPositionTimestamp == null) {
throw new ServerlessError(
`You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP.`,
'FUNCTION_MSK_STARTING_POSITION_TIMESTAMP_INVALID'
);
}
const saslScram512 = event.msk.saslScram512;
const consumerGroupId = event.msk.consumerGroupId;
const filterPatterns = event.msk.filterPatterns;
@ -115,6 +126,10 @@ class AwsCompileMSKEvents {
},
};
if (startingPositionTimestamp != null) {
mskResource.Properties.StartingPositionTimestamp = startingPositionTimestamp;
}
if (batchSize) {
mskResource.Properties.BatchSize = batchSize;
}

View File

@ -11,7 +11,8 @@ describe('AwsCompileMSKEvents', () => {
const arn = 'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a';
const topic = 'TestingTopic';
const enabled = false;
const startingPosition = 'LATEST';
const startingPosition = 'AT_TIMESTAMP';
const startingPositionTimestamp = 123;
const batchSize = 5000;
const maximumBatchingWindow = 10;
const saslScram512 =
@ -56,6 +57,7 @@ describe('AwsCompileMSKEvents', () => {
maximumBatchingWindow,
enabled,
startingPosition,
startingPositionTimestamp,
saslScram512,
consumerGroupId,
filterPatterns,
@ -121,6 +123,7 @@ describe('AwsCompileMSKEvents', () => {
Enabled: enabled,
EventSourceArn: arn,
StartingPosition: startingPosition,
StartingPositionTimestamp: startingPositionTimestamp,
SourceAccessConfigurations: sourceAccessConfigurations,
Topics: [topic],
FunctionName: {
@ -145,6 +148,40 @@ describe('AwsCompileMSKEvents', () => {
},
});
});
describe('when startingPosition is AT_TIMESTAMP', () => {
it('if startingPosition is not provided, it should fail to compile EventSourceMapping resource properties', async () => {
await expect(
runServerless({
fixture: 'function',
configExt: {
functions: {
other: {
events: [
{
msk: {
topic,
arn,
batchSize,
maximumBatchingWindow,
enabled,
startingPosition,
saslScram512,
consumerGroupId,
filterPatterns,
},
},
],
},
},
},
command: 'package',
})
).to.be.rejected.and.eventually.contain({
code: 'FUNCTION_MSK_STARTING_POSITION_TIMESTAMP_INVALID',
});
});
});
});
describe('when no msk events are defined', () => {