'use strict' // NOTE: `kafkajs` is bundled into the deployment package // eslint-disable-next-line import/no-unresolved const { Kafka } = require('kafkajs') function consumer(event, context, callback) { const functionName = 'consumer' const { records } = event const messages = Object.values(records)[0].map((record) => Buffer.from(record.value, 'base64').toString(), ) // eslint-disable-next-line no-console console.log(functionName, JSON.stringify(messages)) return callback(null, event) } async function producer() { const kafkaBrokers = process.env.BROKER_URLS.split(',') const kafkaTopic = process.env.TOPIC_NAME const kafka = new Kafka({ clientId: 'myapp', brokers: kafkaBrokers, ssl: true, }) const kafkaProducer = kafka.producer() await kafkaProducer.connect() await kafkaProducer.send({ topic: kafkaTopic, messages: [{ value: 'Hello from MSK Integration test!' }], }) await kafkaProducer.disconnect() return { statusCode: 200, } } module.exports = { producer, consumer }