Merge pull request #1962 from murgatroid99/upmerge_1.4.3

Merge 1.4.x into master
This commit is contained in:
Michael Lumish 2021-11-08 09:32:45 -08:00 committed by GitHub
commit ff387c9cc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 258 additions and 92 deletions

View File

@ -137,17 +137,21 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
}
handleResponses(responses: Cluster__Output[], isV2: boolean): string | null {
const validResponses: Cluster__Output[] = [];
let errorMessage: string | null = null;
for (const message of responses) {
if (!this.validateResponse(message)) {
if (this.validateResponse(message)) {
validResponses.push(message);
} else {
trace('CDS validation failed for message ' + JSON.stringify(message));
return 'CDS Error: Cluster validation failed';
errorMessage = 'CDS Error: Cluster validation failed';
}
}
this.latestResponses = responses;
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allEdsServiceNames: Set<string> = new Set<string>();
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
for (const message of validResponses) {
allClusterNames.add(message.name);
const edsServiceName = message.eds_cluster_config?.service_name ?? '';
allEdsServiceNames.add(
@ -161,7 +165,7 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
trace('Received CDS updates for cluster names [' + Array.from(allClusterNames) + ']');
this.handleMissingNames(allClusterNames);
this.edsState.handleMissingNames(allEdsServiceNames);
return null;
return errorMessage;
}
reportStreamError(status: StatusObject): void {

View File

@ -146,16 +146,20 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
}
handleResponses(responses: ClusterLoadAssignment__Output[], isV2: boolean) {
const validResponses: ClusterLoadAssignment__Output[] = [];
let errorMessage: string | null = null;
for (const message of responses) {
if (!this.validateResponse(message)) {
if (this.validateResponse(message)) {
validResponses.push(message);
} else {
trace('EDS validation failed for message ' + JSON.stringify(message));
return 'EDS Error: ClusterLoadAssignment validation failed';
errorMessage = 'EDS Error: ClusterLoadAssignment validation failed';
}
}
this.latestResponses = responses;
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
for (const message of validResponses) {
allClusterNames.add(message.cluster_name);
const watchers = this.watchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
@ -163,7 +167,7 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
}
}
trace('Received EDS updates for cluster names [' + Array.from(allClusterNames) + ']');
return null;
return errorMessage;
}
reportStreamError(status: StatusObject): void {

View File

@ -154,17 +154,21 @@ export class LdsState implements XdsStreamState<Listener__Output> {
}
handleResponses(responses: Listener__Output[], isV2: boolean): string | null {
const validResponses: Listener__Output[] = [];
let errorMessage: string | null = null;
for (const message of responses) {
if (!this.validateResponse(message, isV2)) {
if (this.validateResponse(message, isV2)) {
validResponses.push(message);
} else {
trace('LDS validation failed for message ' + JSON.stringify(message));
return 'LDS Error: Route validation failed';
errorMessage = 'LDS Error: Route validation failed';
}
}
this.latestResponses = responses;
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allTargetNames = new Set<string>();
const allRouteConfigNames = new Set<string>();
for (const message of responses) {
for (const message of validResponses) {
allTargetNames.add(message.name);
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener!.value);
if (httpConnectionManager.rds) {
@ -178,7 +182,7 @@ export class LdsState implements XdsStreamState<Listener__Output> {
trace('Received LDS response with listener names [' + Array.from(allTargetNames) + ']');
this.handleMissingNames(allTargetNames);
this.rdsState.handleMissingNames(allRouteConfigNames);
return null;
return errorMessage;
}
reportStreamError(status: StatusObject): void {

View File

@ -183,16 +183,20 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
}
handleResponses(responses: RouteConfiguration__Output[], isV2: boolean): string | null {
const validResponses: RouteConfiguration__Output[] = [];
let errorMessage: string | null = null;
for (const message of responses) {
if (!this.validateResponse(message, isV2)) {
if (this.validateResponse(message, isV2)) {
validResponses.push(message);
} else {
trace('RDS validation failed for message ' + JSON.stringify(message));
return 'RDS Error: Route validation failed';
errorMessage = 'RDS Error: Route validation failed';
}
}
this.latestResponses = responses;
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allRouteConfigNames = new Set<string>();
for (const message of responses) {
for (const message of validResponses) {
allRouteConfigNames.add(message.name);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
@ -200,7 +204,7 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
}
}
trace('Received RDS response with route config names [' + Array.from(allRouteConfigNames) + ']');
return null;
return errorMessage;
}
reportStreamError(status: StatusObject): void {

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.4.1",
"version": "1.4.3",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -36,6 +36,7 @@ export interface ChannelOptions {
'grpc.enable_http_proxy'?: number;
'grpc.http_connect_target'?: string;
'grpc.http_connect_creds'?: string;
'grpc.enable_channelz'?: number;
'grpc-node.max_session_memory'?: number;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
@ -61,6 +62,7 @@ export const recognizedOptions = {
'grpc.max_send_message_length': true,
'grpc.max_receive_message_length': true,
'grpc.enable_http_proxy': true,
'grpc.enable_channelz': true,
'grpc-node.max_session_memory': true,
};

View File

@ -172,6 +172,7 @@ export class ChannelImplementation implements Channel {
private configSelector: ConfigSelector | null = null;
// Channelz info
private readonly channelzEnabled: boolean = true;
private originalTarget: string;
private channelzRef: ChannelRef;
private channelzTrace: ChannelzTrace;
@ -213,9 +214,22 @@ export class ChannelImplementation implements Channel {
this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
this.callRefTimer.unref?.();
this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo());
if (this.options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
this.channelzTrace = new ChannelzTrace();
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
if (this.channelzEnabled) {
this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
} else {
// Dummy channelz ref that will never be used
this.channelzRef = {
kind: 'channel',
id: -1,
name: ''
};
}
if (this.options['grpc.default_authority']) {
this.defaultAuthority = this.options['grpc.default_authority'] as string;
@ -242,7 +256,9 @@ export class ChannelImplementation implements Channel {
Object.assign({}, this.options, subchannelArgs),
this.credentials
);
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
}
return subchannel;
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
@ -262,10 +278,14 @@ export class ChannelImplementation implements Channel {
);
},
addChannelzChild: (child: ChannelRef | SubchannelRef) => {
this.childrenTracker.refChild(child);
if (this.channelzEnabled) {
this.childrenTracker.refChild(child);
}
},
removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
this.childrenTracker.unrefChild(child);
if (this.channelzEnabled) {
this.childrenTracker.unrefChild(child);
}
}
};
this.resolvingLoadBalancer = new ResolvingLoadBalancer(
@ -273,7 +293,9 @@ export class ChannelImplementation implements Channel {
channelControlHelper,
options,
(configSelector) => {
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
}
this.configSelector = configSelector;
/* We process the queue asynchronously to ensure that the corresponding
* load balancer update has completed. */
@ -288,7 +310,9 @@ export class ChannelImplementation implements Channel {
});
},
(status) => {
this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
}
if (this.configSelectionQueue.length > 0) {
this.trace('Name resolution failed with calls queued for config selection');
}
@ -553,7 +577,9 @@ export class ChannelImplementation implements Channel {
' -> ' +
ConnectivityState[newState]
);
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
}
this.connectivityState = newState;
const watchersCopy = this.connectivityStateWatchers.slice();
for (const watcherObject of watchersCopy) {
@ -638,7 +664,9 @@ export class ChannelImplementation implements Channel {
this.resolvingLoadBalancer.destroy();
this.updateState(ConnectivityState.SHUTDOWN);
clearInterval(this.callRefTimer);
unregisterChannelzRef(this.channelzRef);
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
this.subchannelPool.unrefUnusedSubchannels();
}
@ -690,6 +718,11 @@ export class ChannelImplementation implements Channel {
this.connectivityStateWatchers.push(watcherObject);
}
/**
* Get the channelz reference object for this channel. The returned value is
* garbage if channelz is disabled for this channel.
* @returns
*/
getChannelzRef() {
return this.channelzRef;
}
@ -735,14 +768,16 @@ export class ChannelImplementation implements Channel {
this.credentials._getCallCredentials(),
callNumber
);
this.callTracker.addCallStarted();
stream.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
stream.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
}
return stream;
}
}

View File

@ -113,6 +113,14 @@ interface TraceEvent {
childSubchannel?: SubchannelRef;
}
/**
* The loose upper bound on the number of events that should be retained in a
* trace. This may be exceeded by up to a factor of 2. Arbitrarily chosen as a
* number that should be large enough to contain the recent relevant
* information, but small enough to not use excessive memory.
*/
const TARGET_RETAINED_TRACES = 32;
export class ChannelzTrace {
events: TraceEvent[] = [];
creationTimestamp: Date;
@ -131,6 +139,10 @@ export class ChannelzTrace {
childChannel: child?.kind === 'channel' ? child : undefined,
childSubchannel: child?.kind === 'subchannel' ? child : undefined
});
// Whenever the trace array gets too large, discard the first half
if (this.events.length >= TARGET_RETAINED_TRACES * 2) {
this.events = this.events.slice(TARGET_RETAINED_TRACES);
}
this.eventsLogged += 1;
}
@ -380,18 +392,29 @@ export function unregisterChannelzRef(ref: ChannelRef | SubchannelRef | ServerRe
}
}
export interface ChannelzClientView {
updateState(connectivityState: ConnectivityState): void;
addTrace(severity: TraceSeverity, description: string, child?: ChannelRef | SubchannelRef): void;
addCallStarted(): void;
addCallSucceeded(): void;
addCallFailed(): void;
addChild(child: ChannelRef | SubchannelRef): void;
removeChild(child: ChannelRef | SubchannelRef): void;
/**
* Parse a single section of an IPv6 address as two bytes
* @param addressSection A hexadecimal string of length up to 4
* @returns The pair of bytes representing this address section
*/
function parseIPv6Section(addressSection: string): [number, number] {
const numberValue = Number.parseInt(addressSection, 16);
return [numberValue / 256 | 0, numberValue % 256];
}
export interface ChannelzSubchannelView extends ChannelzClientView {
getRef(): SubchannelRef;
/**
* Parse a chunk of an IPv6 address string to some number of bytes
* @param addressChunk Some number of segments of up to 4 hexadecimal
* characters each, joined by colons.
* @returns The list of bytes representing this address chunk
*/
function parseIPv6Chunk(addressChunk: string): number[] {
if (addressChunk === '') {
return [];
}
const bytePairs = addressChunk.split(':').map(section => parseIPv6Section(section));
const result: number[] = [];
return result.concat(...bytePairs);
}
/**
@ -405,17 +428,17 @@ function ipAddressStringToBuffer(ipAddress: string): Buffer | null {
return Buffer.from(Uint8Array.from(ipAddress.split('.').map(segment => Number.parseInt(segment))));
} else if (isIPv6(ipAddress)) {
let leftSection: string;
let rightSection: string | null;
let rightSection: string;
const doubleColonIndex = ipAddress.indexOf('::');
if (doubleColonIndex === -1) {
leftSection = ipAddress;
rightSection = null;
rightSection = '';
} else {
leftSection = ipAddress.substring(0, doubleColonIndex);
rightSection = ipAddress.substring(doubleColonIndex + 2);
}
const leftBuffer = Uint8Array.from(leftSection.split(':').map(segment => Number.parseInt(segment, 16)));
const rightBuffer = rightSection ? Uint8Array.from(rightSection.split(':').map(segment => Number.parseInt(segment, 16))) : new Uint8Array();
const leftBuffer = Buffer.from(parseIPv6Chunk(leftSection));
const rightBuffer = Buffer.from(parseIPv6Chunk(rightSection));
const middleBuffer = Buffer.alloc(16 - leftBuffer.length - rightBuffer.length, 0);
return Buffer.concat([leftBuffer, middleBuffer, rightBuffer]);
} else {

View File

@ -149,6 +149,7 @@ export class Server {
private options: ChannelOptions;
// Channelz Info
private readonly channelzEnabled: boolean = true;
private channelzRef: ServerRef;
private channelzTrace = new ChannelzTrace();
private callTracker = new ChannelzCallTracker();
@ -157,9 +158,20 @@ export class Server {
constructor(options?: ChannelOptions) {
this.options = options ?? {};
this.channelzRef = registerChannelzServer(() => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Server created');
this.trace('Server constructed');
if (this.options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
if (this.channelzEnabled) {
this.channelzRef = registerChannelzServer(() => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Server created');
this.trace('Server constructed');
} else {
// Dummy channelz ref that will never be used
this.channelzRef = {
kind: 'server',
id: -1
};
}
}
private getChannelzInfo(): ServerInfo {
@ -638,7 +650,9 @@ export class Server {
if (this.started === true) {
throw new Error('server is already started');
}
this.channelzTrace.addTrace('CT_INFO', 'Starting');
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Starting');
}
this.started = true;
}
@ -686,6 +700,11 @@ export class Server {
throw new Error('Not yet implemented');
}
/**
* Get the channelz reference object for this server. The returned value is
* garbage if channelz is disabled for this server.
* @returns
*/
getChannelzRef() {
return this.channelzRef;
}
@ -841,12 +860,16 @@ export class Server {
this.sessions.set(session, channelzSessionInfo);
const clientAddress = session.socket.remoteAddress;
this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
this.sessionChildrenTracker.refChild(channelzRef);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress);
this.sessionChildrenTracker.refChild(channelzRef);
}
session.on('close', () => {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
this.sessionChildrenTracker.unrefChild(channelzRef);
unregisterChannelzRef(channelzRef);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress);
this.sessionChildrenTracker.unrefChild(channelzRef);
unregisterChannelzRef(channelzRef);
}
this.sessions.delete(session);
});
});

View File

@ -157,6 +157,7 @@ export class Subchannel {
private subchannelAddressString: string;
// Channelz info
private readonly channelzEnabled: boolean = true;
private channelzRef: SubchannelRef;
private channelzTrace: ChannelzTrace;
private callTracker = new ChannelzCallTracker();
@ -226,9 +227,21 @@ export class Subchannel {
}, backoffOptions);
this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo());
if (options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
this.channelzTrace = new ChannelzTrace();
this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
if (this.channelzEnabled) {
this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo());
this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
} else {
// Dummy channelz ref that will never be used
this.channelzRef = {
kind: 'subchannel',
id: -1,
name: ''
};
}
this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
}
@ -286,6 +299,9 @@ export class Subchannel {
}
private resetChannelzSocketInfo() {
if (!this.channelzEnabled) {
return;
}
if (this.channelzSocketRef) {
unregisterChannelzRef(this.channelzSocketRef);
this.childrenTracker.unrefChild(this.channelzSocketRef);
@ -335,7 +351,9 @@ export class Subchannel {
}
private sendPing() {
this.keepalivesSent += 1;
if (this.channelzEnabled) {
this.keepalivesSent += 1;
}
logging.trace(
LogVerbosity.DEBUG,
'keepalive',
@ -462,8 +480,10 @@ export class Subchannel {
connectionOptions
);
this.session = session;
this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!);
this.childrenTracker.refChild(this.channelzSocketRef);
if (this.channelzEnabled) {
this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!);
this.childrenTracker.refChild(this.channelzSocketRef);
}
session.unref();
/* For all of these events, check if the session at the time of the event
* is the same one currently attached to this subchannel, to ensure that
@ -615,7 +635,9 @@ export class Subchannel {
' -> ' +
ConnectivityState[newState]
);
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
}
const previousState = this.connectivityState;
this.connectivityState = newState;
switch (newState) {
@ -678,12 +700,16 @@ export class Subchannel {
/* If no calls, channels, or subchannel pools have any more references to
* this subchannel, we can be sure it will never be used again. */
if (this.callRefcount === 0 && this.refcount === 0) {
this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
}
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE
);
unregisterChannelzRef(this.channelzRef);
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
}
}
@ -805,34 +831,43 @@ export class Subchannel {
' with headers\n' +
headersString
);
this.callTracker.addCallStarted();
callStream.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
const streamSession = this.session;
this.streamTracker.addCallStarted();
callStream.addStreamEndWatcher(success => {
if (streamSession === this.session) {
if (success) {
this.streamTracker.addCallSucceeded();
let statsTracker: SubchannelCallStatsTracker;
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
callStream.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.streamTracker.addCallFailed();
this.callTracker.addCallFailed();
}
});
this.streamTracker.addCallStarted();
callStream.addStreamEndWatcher(success => {
if (streamSession === this.session) {
if (success) {
this.streamTracker.addCallSucceeded();
} else {
this.streamTracker.addCallFailed();
}
}
});
statsTracker = {
addMessageSent: () => {
this.messagesSent += 1;
this.lastMessageSentTimestamp = new Date();
},
addMessageReceived: () => {
this.messagesReceived += 1;
}
}
});
callStream.attachHttp2Stream(http2Stream, this, extraFilters, {
addMessageSent: () => {
this.messagesSent += 1;
this.lastMessageSentTimestamp = new Date();
},
addMessageReceived: () => {
this.messagesReceived += 1;
} else {
statsTracker = {
addMessageSent: () => {},
addMessageReceived: () => {}
}
});
}
callStream.attachHttp2Stream(http2Stream, this, extraFilters, statsTracker);
}
/**

View File

@ -286,4 +286,36 @@ describe('Channelz', () => {
});
});
});
});
describe('Disabling channelz', () => {
let testServer: grpc.Server;
let testClient: ServiceClient;
beforeEach((done) => {
testServer = new grpc.Server({'grpc.enable_channelz': 0});
testServer.addService(TestServiceClient.service, testServiceImpl);
testServer.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
if (error) {
done(error);
return;
}
testServer.start();
testClient = new TestServiceClient(`localhost:${port}`, grpc.credentials.createInsecure(), {'grpc.enable_channelz': 0});
done();
});
});
afterEach(() => {
testClient.close();
testServer.forceShutdown();
});
it('Should still work', (done) => {
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 1);
testClient.unary({}, {deadline}, (error: grpc.ServiceError, value: unknown) => {
assert.ifError(error);
done();
});
});
});