serverless/lib/cli/interactive-setup/console-dev-mode-feed.js
2023-03-20 15:48:16 +01:00

239 lines
7.7 KiB
JavaScript

'use strict';
const { writeText, style } = require('@serverless/utils/log');
const colorize = require('json-colorizer');
const WebSocket = require('ws');
const chalk = require('chalk');
const { devModeFeed } = require('@serverless/utils/lib/auth/urls');
const consoleUi = require('@serverless/utils/console-ui');
const streamBuffers = require('stream-buffers');
const apiRequest = require('@serverless/utils/api-request');
const promptWithHistory = require('@serverless/utils/inquirer/prompt-with-history');
const streamBuff = new streamBuffers.ReadableStreamBuffer({
frequency: 500,
chunkSize: 2048 * 1000000,
});
const consoleMonitoringCounter = {
logBatches: 0,
events: 0,
responses: 0,
};
const jsonError = {
colors: {
BRACE: '#FD5750',
BRACKET: '#FD5750',
COLON: '#FD5750',
COMMA: '#FD5750',
STRING_KEY: '#FD5750',
STRING_LITERAL: '#FD5750',
NUMBER_LITERAL: '#FD5750',
BOOLEAN_LITERAL: '#FD5750',
NULL_LITERAL: '#FD5750',
},
};
const jsonColors = {
colors: {
BRACE: 'grey',
BRACKET: 'grey',
COLON: 'grey.bold',
COMMA: 'grey',
STRING_KEY: 'grey.bold',
STRING_LITERAL: 'grey',
NUMBER_LITERAL: 'grey',
BOOLEAN_LITERAL: 'grey',
NULL_LITERAL: 'grey',
},
};
const headerChalk = chalk.grey;
const handleSocketMessage = (data) => {
try {
const splitData = data.toString('utf-8').split(';;;');
const jsonArray = splitData
.filter((item) => item !== '' && item.startsWith('['))
.flatMap((item) => JSON.parse(item));
const sortedItems = consoleUi.omitAndSortDevModeActivity(jsonArray);
for (const activity of sortedItems) {
const resourceName = ((activity.tags || {}).aws || {}).resourceName;
const time = consoleUi.formatConsoleDate(new Date(activity.timestamp));
switch (activity.type) {
case 'log':
consoleMonitoringCounter.logBatches += 1;
process.stdout.write(headerChalk(`${time}${resourceName} • Log\n`));
try {
const parsedBody = JSON.parse(activity.body);
if (typeof parsedBody === 'string') {
throw new Error('Not a JSON object');
}
process.stdout.write(`${colorize(JSON.stringify(parsedBody, null, 2), jsonColors)}\n`);
} catch (error) {
process.stdout.write(
chalk.grey.bold(`${activity.body}${activity.body.endsWith('\n') ? '' : '\n'}`)
);
}
break;
case 'span': {
const span = consoleUi.formatConsoleSpan(activity);
process.stdout.write(
headerChalk(`${time}${resourceName} • Span • ${span.niceName}\n`)
);
break;
}
case 'aws-lambda-request':
process.stdout.write(headerChalk(`${time}${resourceName} • Invocation Started\n`));
break;
case 'aws-lambda-response':
consoleMonitoringCounter.responses += 1;
process.stdout.write(headerChalk(`${time}${resourceName} • Invocation Ended\n`));
break;
case 'event': {
consoleMonitoringCounter.events += 1;
const { message, payload } = consoleUi.formatConsoleEvent(activity);
const headerWriter = /ERROR •/.test(message) ? chalk.hex('#FD5750') : headerChalk;
const options = /ERROR •/.test(message) ? jsonError : jsonColors;
process.stdout.write(headerWriter(`${time}${resourceName}${message}\n`));
process.stdout.write(`${colorize(JSON.stringify(payload, null, 2), options)}\n`);
break;
}
default:
}
}
} catch (error) {
process.stdout.write(error, '\n');
}
};
const connectToWebSocket = async ({ functionName, region, accountId, org, state }) => {
const { token } = await apiRequest(`/api/identity/orgs/${org.orgId}/token`);
const ws = new WebSocket(`${devModeFeed}?Auth=${token}`, {
perMessageDeflate: false,
});
ws.on('open', () => {
if (state && state === 'firstConnection') {
writeText(style.aside('Waiting for dev mode activity...\n'));
} else if (state && state === 'resume') {
writeText(style.aside('Resuming for dev mode activity...\n'));
}
ws.send(
JSON.stringify({ filters: { functionName, region: [region], accountId: [accountId] } })
);
});
ws.on('message', (data) => {
streamBuff.put(`${data.toString('utf-8')};;;`);
});
return ws;
};
const startDevModeFeed = async (context, devModeFeedConnection) =>
new Promise((resolve) => {
const createStillWorkingTimeout = () =>
setTimeout(async () => {
clearInterval(eventPublishTimer);
clearInterval(connectionRefreshTimer);
devModeFeedConnection.terminate();
writeText(style.aside('Pausing for dev mode activity.\n'));
const shouldContinue = await promptWithHistory({
name: 'shouldContinue',
message: 'Are you still working?',
stepHistory: context.stepHistory,
type: 'confirm',
});
if (shouldContinue) {
await startDevModeFeed(context, 'resume');
}
resolve();
}, 1000 * 60 * 60 * 1.5); // Check for activity every 1.5 hours
let stillWorkingTimer = createStillWorkingTimeout();
const watchStream = (feed) => {
feed.on('message', (data) => {
// Ignore connection message
const parsedData = JSON.parse(data.toString('utf-8'));
if (!parsedData.resetThrottle) {
clearTimeout(stillWorkingTimer);
stillWorkingTimer = createStillWorkingTimeout();
}
});
};
watchStream(devModeFeedConnection);
const connectionRefreshTimer = setInterval(async () => {
const newConnection = await connectToWebSocket({
functionName: context.consoleDevModeTargetFunctions,
accountId: context.awsAccountId,
region: context.serverless.service.provider.region,
org: context.org,
state: false,
});
const oldConnection = devModeFeedConnection;
oldConnection.terminate();
devModeFeedConnection = newConnection;
watchStream(devModeFeedConnection);
}, 1000 * 60 * 60); // Refresh every hour
const eventPublishTimer = setInterval(async () => {
const { userId } = await apiRequest('/api/identity/me');
const body = {
source: 'web.dev_mode.activity.v1',
event: {
orgUid: context.org.orgId,
userId,
logBatches: consoleMonitoringCounter.logBatches,
responses: consoleMonitoringCounter.responses,
events: consoleMonitoringCounter.events,
source: 'cli:serverless',
},
};
await apiRequest('/api/events/publish', {
method: 'POST',
body,
});
consoleMonitoringCounter.logBatches = 0;
consoleMonitoringCounter.responses = 0;
consoleMonitoringCounter.events = 0;
}, 1000 * 60); // Publish every 60 seconds
});
module.exports = {
async isApplicable(context) {
const { isConsoleDevMode, org, consoleDevModeTargetFunctions } = context;
if (!isConsoleDevMode) {
context.inapplicabilityReasonCode = 'NON_DEV_MODE_CONTEXT';
return false;
}
if (!org) {
context.inapplicabilityReasonCode = 'UNRESOLVED_ORG';
return false;
}
if (!consoleDevModeTargetFunctions) {
context.inapplicabilityReasonCode = 'NO_TARGET_FUNCTIONS';
return false;
}
return true;
},
async run(context) {
streamBuff.on('data', handleSocketMessage);
const devModeFeedConnection = await connectToWebSocket({
functionName: context.consoleDevModeTargetFunctions,
accountId: context.awsAccountId,
region: context.serverless.service.provider.region,
org: context.org,
state: 'firstConnection',
});
await startDevModeFeed(context, devModeFeedConnection);
},
};