From 5139b107e0f4318b3ee39b2af1daf5446eb05517 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 18 Oct 2021 09:29:43 -0700 Subject: [PATCH 1/7] grpc-js: Limit the number of retained channelz trace events --- packages/grpc-js/package.json | 2 +- packages/grpc-js/src/channelz.ts | 26 ++++++++++++-------------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 9705383e..4acf948d 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.4.1", + "version": "1.4.2", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/channelz.ts b/packages/grpc-js/src/channelz.ts index 7efe5478..a1c85eb4 100644 --- a/packages/grpc-js/src/channelz.ts +++ b/packages/grpc-js/src/channelz.ts @@ -113,6 +113,14 @@ interface TraceEvent { childSubchannel?: SubchannelRef; } +/** + * The loose upper bound on the number of events that should be retained in a + * trace. This may be exceeded by up to a factor of 2. Arbitrarily chosen as a + * number that should be large enough to contain the recent relevant + * information, but small enough to not use excessive memory. + */ +const TARGET_RETAINED_TRACES = 32; + export class ChannelzTrace { events: TraceEvent[] = []; creationTimestamp: Date; @@ -131,6 +139,10 @@ export class ChannelzTrace { childChannel: child?.kind === 'channel' ? child : undefined, childSubchannel: child?.kind === 'subchannel' ? child : undefined }); + // Whenever the trace array gets too large, discard the first half + if (this.events.length >= TARGET_RETAINED_TRACES * 2) { + this.events = this.events.slice(TARGET_RETAINED_TRACES); + } this.eventsLogged += 1; } @@ -380,20 +392,6 @@ export function unregisterChannelzRef(ref: ChannelRef | SubchannelRef | ServerRe } } -export interface ChannelzClientView { - updateState(connectivityState: ConnectivityState): void; - addTrace(severity: TraceSeverity, description: string, child?: ChannelRef | SubchannelRef): void; - addCallStarted(): void; - addCallSucceeded(): void; - addCallFailed(): void; - addChild(child: ChannelRef | SubchannelRef): void; - removeChild(child: ChannelRef | SubchannelRef): void; -} - -export interface ChannelzSubchannelView extends ChannelzClientView { - getRef(): SubchannelRef; -} - /** * Converts an IPv4 or IPv6 address from string representation to binary * representation From 891a918c85d717c814d98ee0ce01363970b99007 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 18 Oct 2021 13:31:08 -0700 Subject: [PATCH 2/7] grpc-js: Allow users to disable channelz --- packages/grpc-js/src/channel-options.ts | 2 + packages/grpc-js/src/channel.ts | 69 +++++++++++++----- packages/grpc-js/src/server.ts | 41 ++++++++--- packages/grpc-js/src/subchannel.ts | 94 ++++++++++++++++--------- 4 files changed, 147 insertions(+), 59 deletions(-) diff --git a/packages/grpc-js/src/channel-options.ts b/packages/grpc-js/src/channel-options.ts index 604fd868..4e2ee613 100644 --- a/packages/grpc-js/src/channel-options.ts +++ b/packages/grpc-js/src/channel-options.ts @@ -36,6 +36,7 @@ export interface ChannelOptions { 'grpc.enable_http_proxy'?: number; 'grpc.http_connect_target'?: string; 'grpc.http_connect_creds'?: string; + 'grpc.enable_channelz'?: number; 'grpc-node.max_session_memory'?: number; // eslint-disable-next-line @typescript-eslint/no-explicit-any [key: string]: any; @@ -61,6 +62,7 @@ export const recognizedOptions = { 'grpc.max_send_message_length': true, 'grpc.max_receive_message_length': true, 'grpc.enable_http_proxy': true, + 'grpc.enable_channelz': true, 'grpc-node.max_session_memory': true, }; diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index f998ecef..e442e6e6 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -172,6 +172,7 @@ export class ChannelImplementation implements Channel { private configSelector: ConfigSelector | null = null; // Channelz info + private readonly channelzEnabled: boolean = true; private originalTarget: string; private channelzRef: ChannelRef; private channelzTrace: ChannelzTrace; @@ -213,9 +214,22 @@ export class ChannelImplementation implements Channel { this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME); this.callRefTimer.unref?.(); - this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo()); + if (this.options['grpc.enable_channelz'] === 0) { + this.channelzEnabled = false; + } + this.channelzTrace = new ChannelzTrace(); - this.channelzTrace.addTrace('CT_INFO', 'Channel created'); + if (this.channelzEnabled) { + this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo()); + this.channelzTrace.addTrace('CT_INFO', 'Channel created'); + } else { + // Dummy channelz ref that will never be used + this.channelzRef = { + kind: 'channel', + id: -1, + name: '' + }; + } if (this.options['grpc.default_authority']) { this.defaultAuthority = this.options['grpc.default_authority'] as string; @@ -242,7 +256,9 @@ export class ChannelImplementation implements Channel { Object.assign({}, this.options, subchannelArgs), this.credentials ); - this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef()); + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef()); + } return subchannel; }, updateState: (connectivityState: ConnectivityState, picker: Picker) => { @@ -262,10 +278,14 @@ export class ChannelImplementation implements Channel { ); }, addChannelzChild: (child: ChannelRef | SubchannelRef) => { - this.childrenTracker.refChild(child); + if (this.channelzEnabled) { + this.childrenTracker.refChild(child); + } }, removeChannelzChild: (child: ChannelRef | SubchannelRef) => { - this.childrenTracker.unrefChild(child); + if (this.channelzEnabled) { + this.childrenTracker.unrefChild(child); + } } }; this.resolvingLoadBalancer = new ResolvingLoadBalancer( @@ -273,7 +293,9 @@ export class ChannelImplementation implements Channel { channelControlHelper, options, (configSelector) => { - this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded'); + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded'); + } this.configSelector = configSelector; /* We process the queue asynchronously to ensure that the corresponding * load balancer update has completed. */ @@ -288,7 +310,9 @@ export class ChannelImplementation implements Channel { }); }, (status) => { - this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"'); + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"'); + } if (this.configSelectionQueue.length > 0) { this.trace('Name resolution failed with calls queued for config selection'); } @@ -553,7 +577,9 @@ export class ChannelImplementation implements Channel { ' -> ' + ConnectivityState[newState] ); - this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]); + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]); + } this.connectivityState = newState; const watchersCopy = this.connectivityStateWatchers.slice(); for (const watcherObject of watchersCopy) { @@ -638,7 +664,9 @@ export class ChannelImplementation implements Channel { this.resolvingLoadBalancer.destroy(); this.updateState(ConnectivityState.SHUTDOWN); clearInterval(this.callRefTimer); - unregisterChannelzRef(this.channelzRef); + if (this.channelzEnabled) { + unregisterChannelzRef(this.channelzRef); + } this.subchannelPool.unrefUnusedSubchannels(); } @@ -690,6 +718,11 @@ export class ChannelImplementation implements Channel { this.connectivityStateWatchers.push(watcherObject); } + /** + * Get the channelz reference object for this channel. The returned value is + * garbage if channelz is disabled for this channel. + * @returns + */ getChannelzRef() { return this.channelzRef; } @@ -735,14 +768,16 @@ export class ChannelImplementation implements Channel { this.credentials._getCallCredentials(), callNumber ); - this.callTracker.addCallStarted(); - stream.addStatusWatcher(status => { - if (status.code === Status.OK) { - this.callTracker.addCallSucceeded(); - } else { - this.callTracker.addCallFailed(); - } - }); + if (this.channelzEnabled) { + this.callTracker.addCallStarted(); + stream.addStatusWatcher(status => { + if (status.code === Status.OK) { + this.callTracker.addCallSucceeded(); + } else { + this.callTracker.addCallFailed(); + } + }); + } return stream; } } diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index aed04ade..8bfc2a5b 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -149,6 +149,7 @@ export class Server { private options: ChannelOptions; // Channelz Info + private readonly channelzEnabled: boolean = true; private channelzRef: ServerRef; private channelzTrace = new ChannelzTrace(); private callTracker = new ChannelzCallTracker(); @@ -157,9 +158,20 @@ export class Server { constructor(options?: ChannelOptions) { this.options = options ?? {}; - this.channelzRef = registerChannelzServer(() => this.getChannelzInfo()); - this.channelzTrace.addTrace('CT_INFO', 'Server created'); - this.trace('Server constructed'); + if (this.options['grpc.enable_channelz'] === 0) { + this.channelzEnabled = false; + } + if (this.channelzEnabled) { + this.channelzRef = registerChannelzServer(() => this.getChannelzInfo()); + this.channelzTrace.addTrace('CT_INFO', 'Server created'); + this.trace('Server constructed'); + } else { + // Dummy channelz ref that will never be used + this.channelzRef = { + kind: 'server', + id: -1 + }; + } } private getChannelzInfo(): ServerInfo { @@ -638,7 +650,9 @@ export class Server { if (this.started === true) { throw new Error('server is already started'); } - this.channelzTrace.addTrace('CT_INFO', 'Starting'); + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Starting'); + } this.started = true; } @@ -686,6 +700,11 @@ export class Server { throw new Error('Not yet implemented'); } + /** + * Get the channelz reference object for this server. The returned value is + * garbage if channelz is disabled for this server. + * @returns + */ getChannelzRef() { return this.channelzRef; } @@ -841,12 +860,16 @@ export class Server { this.sessions.set(session, channelzSessionInfo); const clientAddress = session.socket.remoteAddress; - this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress); - this.sessionChildrenTracker.refChild(channelzRef); + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress); + this.sessionChildrenTracker.refChild(channelzRef); + } session.on('close', () => { - this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress); - this.sessionChildrenTracker.unrefChild(channelzRef); - unregisterChannelzRef(channelzRef); + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress); + this.sessionChildrenTracker.unrefChild(channelzRef); + unregisterChannelzRef(channelzRef); + } this.sessions.delete(session); }); }); diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index a3b86b28..edc843a8 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -157,6 +157,7 @@ export class Subchannel { private subchannelAddressString: string; // Channelz info + private readonly channelzEnabled: boolean = true; private channelzRef: SubchannelRef; private channelzTrace: ChannelzTrace; private callTracker = new ChannelzCallTracker(); @@ -226,9 +227,21 @@ export class Subchannel { }, backoffOptions); this.subchannelAddressString = subchannelAddressToString(subchannelAddress); - this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo()); + if (options['grpc.enable_channelz'] === 0) { + this.channelzEnabled = false; + } this.channelzTrace = new ChannelzTrace(); - this.channelzTrace.addTrace('CT_INFO', 'Subchannel created'); + if (this.channelzEnabled) { + this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo()); + this.channelzTrace.addTrace('CT_INFO', 'Subchannel created'); + } else { + // Dummy channelz ref that will never be used + this.channelzRef = { + kind: 'subchannel', + id: -1, + name: '' + }; + } this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2)); } @@ -286,6 +299,9 @@ export class Subchannel { } private resetChannelzSocketInfo() { + if (!this.channelzEnabled) { + return; + } if (this.channelzSocketRef) { unregisterChannelzRef(this.channelzSocketRef); this.childrenTracker.unrefChild(this.channelzSocketRef); @@ -335,7 +351,9 @@ export class Subchannel { } private sendPing() { - this.keepalivesSent += 1; + if (this.channelzEnabled) { + this.keepalivesSent += 1; + } logging.trace( LogVerbosity.DEBUG, 'keepalive', @@ -462,8 +480,10 @@ export class Subchannel { connectionOptions ); this.session = session; - this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!); - this.childrenTracker.refChild(this.channelzSocketRef); + if (this.channelzEnabled) { + this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!); + this.childrenTracker.refChild(this.channelzSocketRef); + } session.unref(); /* For all of these events, check if the session at the time of the event * is the same one currently attached to this subchannel, to ensure that @@ -615,7 +635,9 @@ export class Subchannel { ' -> ' + ConnectivityState[newState] ); - this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]); + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]); + } const previousState = this.connectivityState; this.connectivityState = newState; switch (newState) { @@ -678,12 +700,16 @@ export class Subchannel { /* If no calls, channels, or subchannel pools have any more references to * this subchannel, we can be sure it will never be used again. */ if (this.callRefcount === 0 && this.refcount === 0) { - this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); + } this.transitionToState( [ConnectivityState.CONNECTING, ConnectivityState.READY], ConnectivityState.TRANSIENT_FAILURE ); - unregisterChannelzRef(this.channelzRef); + if (this.channelzEnabled) { + unregisterChannelzRef(this.channelzRef); + } } } @@ -805,34 +831,36 @@ export class Subchannel { ' with headers\n' + headersString ); - this.callTracker.addCallStarted(); - callStream.addStatusWatcher(status => { - if (status.code === Status.OK) { - this.callTracker.addCallSucceeded(); - } else { - this.callTracker.addCallFailed(); - } - }); const streamSession = this.session; - this.streamTracker.addCallStarted(); - callStream.addStreamEndWatcher(success => { - if (streamSession === this.session) { - if (success) { - this.streamTracker.addCallSucceeded(); + if (this.channelzEnabled) { + this.callTracker.addCallStarted(); + callStream.addStatusWatcher(status => { + if (status.code === Status.OK) { + this.callTracker.addCallSucceeded(); } else { - this.streamTracker.addCallFailed(); + this.callTracker.addCallFailed(); } - } - }); - callStream.attachHttp2Stream(http2Stream, this, extraFilters, { - addMessageSent: () => { - this.messagesSent += 1; - this.lastMessageSentTimestamp = new Date(); - }, - addMessageReceived: () => { - this.messagesReceived += 1; - } - }); + }); + this.streamTracker.addCallStarted(); + callStream.addStreamEndWatcher(success => { + if (streamSession === this.session) { + if (success) { + this.streamTracker.addCallSucceeded(); + } else { + this.streamTracker.addCallFailed(); + } + } + }); + callStream.attachHttp2Stream(http2Stream, this, extraFilters, { + addMessageSent: () => { + this.messagesSent += 1; + this.lastMessageSentTimestamp = new Date(); + }, + addMessageReceived: () => { + this.messagesReceived += 1; + } + }); + } } /** From 2a45b343d51ae8d3333914659dabdbb1d340334c Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 19 Oct 2021 14:20:37 -0700 Subject: [PATCH 3/7] grpc-js-xds: Use valid resources when NACKing messages --- .../grpc-js-xds/src/xds-stream-state/cds-state.ts | 14 +++++++++----- .../grpc-js-xds/src/xds-stream-state/eds-state.ts | 14 +++++++++----- .../grpc-js-xds/src/xds-stream-state/lds-state.ts | 14 +++++++++----- .../grpc-js-xds/src/xds-stream-state/rds-state.ts | 14 +++++++++----- 4 files changed, 36 insertions(+), 20 deletions(-) diff --git a/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts index 7720c567..5dae09fb 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/cds-state.ts @@ -137,17 +137,21 @@ export class CdsState implements XdsStreamState { } handleResponses(responses: Cluster__Output[], isV2: boolean): string | null { + const validResponses: Cluster__Output[] = []; + let errorMessage: string | null = null; for (const message of responses) { - if (!this.validateResponse(message)) { + if (this.validateResponse(message)) { + validResponses.push(message); + } else { trace('CDS validation failed for message ' + JSON.stringify(message)); - return 'CDS Error: Cluster validation failed'; + errorMessage = 'CDS Error: Cluster validation failed'; } } - this.latestResponses = responses; + this.latestResponses = validResponses; this.latestIsV2 = isV2; const allEdsServiceNames: Set = new Set(); const allClusterNames: Set = new Set(); - for (const message of responses) { + for (const message of validResponses) { allClusterNames.add(message.name); const edsServiceName = message.eds_cluster_config?.service_name ?? ''; allEdsServiceNames.add( @@ -161,7 +165,7 @@ export class CdsState implements XdsStreamState { trace('Received CDS updates for cluster names ' + Array.from(allClusterNames)); this.handleMissingNames(allClusterNames); this.edsState.handleMissingNames(allEdsServiceNames); - return null; + return errorMessage; } reportStreamError(status: StatusObject): void { diff --git a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts index 3861f4d2..7d28ed5f 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts @@ -146,16 +146,20 @@ export class EdsState implements XdsStreamState { } handleResponses(responses: ClusterLoadAssignment__Output[], isV2: boolean) { + const validResponses: ClusterLoadAssignment__Output[] = []; + let errorMessage: string | null = null; for (const message of responses) { - if (!this.validateResponse(message)) { + if (this.validateResponse(message)) { + validResponses.push(message); + } else { trace('EDS validation failed for message ' + JSON.stringify(message)); - return 'EDS Error: ClusterLoadAssignment validation failed'; + errorMessage = 'EDS Error: ClusterLoadAssignment validation failed'; } } - this.latestResponses = responses; + this.latestResponses = validResponses; this.latestIsV2 = isV2; const allClusterNames: Set = new Set(); - for (const message of responses) { + for (const message of validResponses) { allClusterNames.add(message.cluster_name); const watchers = this.watchers.get(message.cluster_name) ?? []; for (const watcher of watchers) { @@ -163,7 +167,7 @@ export class EdsState implements XdsStreamState { } } trace('Received EDS updates for cluster names ' + Array.from(allClusterNames)); - return null; + return errorMessage; } reportStreamError(status: StatusObject): void { diff --git a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts index 10e71bab..5706e376 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts @@ -154,17 +154,21 @@ export class LdsState implements XdsStreamState { } handleResponses(responses: Listener__Output[], isV2: boolean): string | null { + const validResponses: Listener__Output[] = []; + let errorMessage: string | null = null; for (const message of responses) { - if (!this.validateResponse(message, isV2)) { + if (this.validateResponse(message, isV2)) { + validResponses.push(message); + } else { trace('LDS validation failed for message ' + JSON.stringify(message)); - return 'LDS Error: Route validation failed'; + errorMessage = 'LDS Error: Route validation failed'; } } - this.latestResponses = responses; + this.latestResponses = validResponses; this.latestIsV2 = isV2; const allTargetNames = new Set(); const allRouteConfigNames = new Set(); - for (const message of responses) { + for (const message of validResponses) { allTargetNames.add(message.name); const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener!.value); if (httpConnectionManager.rds) { @@ -178,7 +182,7 @@ export class LdsState implements XdsStreamState { trace('Received RDS response with route config names ' + Array.from(allTargetNames)); this.handleMissingNames(allTargetNames); this.rdsState.handleMissingNames(allRouteConfigNames); - return null; + return errorMessage; } reportStreamError(status: StatusObject): void { diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts index ec7abe55..5a385432 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -183,16 +183,20 @@ export class RdsState implements XdsStreamState { } handleResponses(responses: RouteConfiguration__Output[], isV2: boolean): string | null { + const validResponses: RouteConfiguration__Output[] = []; + let errorMessage: string | null = null; for (const message of responses) { - if (!this.validateResponse(message, isV2)) { + if (this.validateResponse(message, isV2)) { + validResponses.push(message); + } else { trace('RDS validation failed for message ' + JSON.stringify(message)); - return 'RDS Error: Route validation failed'; + errorMessage = 'RDS Error: Route validation failed'; } } - this.latestResponses = responses; + this.latestResponses = validResponses; this.latestIsV2 = isV2; const allRouteConfigNames = new Set(); - for (const message of responses) { + for (const message of validResponses) { allRouteConfigNames.add(message.name); const watchers = this.watchers.get(message.name) ?? []; for (const watcher of watchers) { @@ -200,7 +204,7 @@ export class RdsState implements XdsStreamState { } } trace('Received RDS response with route config names ' + Array.from(allRouteConfigNames)); - return null; + return errorMessage; } reportStreamError(status: StatusObject): void { From 91ae2b44b165f4d77ed76deab5275df98230776b Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 25 Oct 2021 13:29:06 -0700 Subject: [PATCH 4/7] grpc-js: Handle undefined socket.localAddress --- packages/grpc-js/src/channelz.ts | 4 ++-- packages/grpc-js/src/server.ts | 2 +- packages/grpc-js/src/subchannel.ts | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/grpc-js/src/channelz.ts b/packages/grpc-js/src/channelz.ts index a1c85eb4..768efaa5 100644 --- a/packages/grpc-js/src/channelz.ts +++ b/packages/grpc-js/src/channelz.ts @@ -298,7 +298,7 @@ export interface TlsInfo { } export interface SocketInfo { - localAddress: SubchannelAddress; + localAddress: SubchannelAddress | null; remoteAddress: SubchannelAddress | null; security: TlsInfo | null; remoteName: string | null; @@ -629,7 +629,7 @@ function GetSocket(call: ServerUnaryCall Date: Thu, 4 Nov 2021 14:11:16 -0700 Subject: [PATCH 5/7] grpc-js: channelz: Fix algorithm for representing an IPv6 address in binary --- packages/grpc-js/src/channelz.ts | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js/src/channelz.ts b/packages/grpc-js/src/channelz.ts index 768efaa5..94d84d8f 100644 --- a/packages/grpc-js/src/channelz.ts +++ b/packages/grpc-js/src/channelz.ts @@ -392,6 +392,16 @@ export function unregisterChannelzRef(ref: ChannelRef | SubchannelRef | ServerRe } } +/** + * Parses a single section of an IPv6 address as two bytes + * @param addressSection A hexadecimal string of length up to 4 + * @returns The pair of bytes representing this address section + */ +function parseIPv6Section(addressSection: string): [number, number] { + const numberValue = Number.parseInt(addressSection, 16); + return [numberValue / 256 | 0, numberValue % 256]; +} + /** * Converts an IPv4 or IPv6 address from string representation to binary * representation @@ -412,8 +422,8 @@ function ipAddressStringToBuffer(ipAddress: string): Buffer | null { leftSection = ipAddress.substring(0, doubleColonIndex); rightSection = ipAddress.substring(doubleColonIndex + 2); } - const leftBuffer = Uint8Array.from(leftSection.split(':').map(segment => Number.parseInt(segment, 16))); - const rightBuffer = rightSection ? Uint8Array.from(rightSection.split(':').map(segment => Number.parseInt(segment, 16))) : new Uint8Array(); + const leftBuffer = Buffer.from(leftSection.split(':').map(segment => parseIPv6Section(segment)).flat()); + const rightBuffer = rightSection ? Buffer.from(rightSection.split(':').map(segment => parseIPv6Section(segment)).flat()) : Buffer.alloc(0); const middleBuffer = Buffer.alloc(16 - leftBuffer.length - rightBuffer.length, 0); return Buffer.concat([leftBuffer, middleBuffer, rightBuffer]); } else { From bb26dcfd1ef1019af887e4a76c72262bc26a5378 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 5 Nov 2021 10:11:22 -0700 Subject: [PATCH 6/7] grpc-js: Fix handling of grpc.enable_channelz option --- packages/grpc-js/package.json | 2 +- packages/grpc-js/src/subchannel.ts | 11 +++++++-- packages/grpc-js/test/test-channelz.ts | 32 ++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 4acf948d..69d0662b 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.4.2", + "version": "1.4.3", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 0b6465b2..87defa74 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -832,6 +832,7 @@ export class Subchannel { headersString ); const streamSession = this.session; + let statsTracker: SubchannelCallStatsTracker; if (this.channelzEnabled) { this.callTracker.addCallStarted(); callStream.addStatusWatcher(status => { @@ -851,7 +852,7 @@ export class Subchannel { } } }); - callStream.attachHttp2Stream(http2Stream, this, extraFilters, { + statsTracker = { addMessageSent: () => { this.messagesSent += 1; this.lastMessageSentTimestamp = new Date(); @@ -859,8 +860,14 @@ export class Subchannel { addMessageReceived: () => { this.messagesReceived += 1; } - }); + } + } else { + statsTracker = { + addMessageSent: () => {}, + addMessageReceived: () => {} + } } + callStream.attachHttp2Stream(http2Stream, this, extraFilters, statsTracker); } /** diff --git a/packages/grpc-js/test/test-channelz.ts b/packages/grpc-js/test/test-channelz.ts index 2cca780e..f14145c3 100644 --- a/packages/grpc-js/test/test-channelz.ts +++ b/packages/grpc-js/test/test-channelz.ts @@ -286,4 +286,36 @@ describe('Channelz', () => { }); }); }); +}); + +describe('Disabling channelz', () => { + let testServer: grpc.Server; + let testClient: ServiceClient; + beforeEach((done) => { + testServer = new grpc.Server({'grpc.enable_channelz': 0}); + testServer.addService(TestServiceClient.service, testServiceImpl); + testServer.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => { + if (error) { + done(error); + return; + } + testServer.start(); + testClient = new TestServiceClient(`localhost:${port}`, grpc.credentials.createInsecure(), {'grpc.enable_channelz': 0}); + done(); + }); + }); + + afterEach(() => { + testClient.close(); + testServer.forceShutdown(); + }); + + it('Should still work', (done) => { + const deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + testClient.unary({}, {deadline}, (error: grpc.ServiceError, value: unknown) => { + assert.ifError(error); + done(); + }); + }); }); \ No newline at end of file From b1be84a0214ed140e3c58d8624c11c5756223376 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 5 Nov 2021 11:54:15 -0700 Subject: [PATCH 7/7] Make IPv6 parsing code compatible with Node 10 --- packages/grpc-js/src/channelz.ts | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/packages/grpc-js/src/channelz.ts b/packages/grpc-js/src/channelz.ts index 94d84d8f..14c94fd0 100644 --- a/packages/grpc-js/src/channelz.ts +++ b/packages/grpc-js/src/channelz.ts @@ -393,7 +393,7 @@ export function unregisterChannelzRef(ref: ChannelRef | SubchannelRef | ServerRe } /** - * Parses a single section of an IPv6 address as two bytes + * Parse a single section of an IPv6 address as two bytes * @param addressSection A hexadecimal string of length up to 4 * @returns The pair of bytes representing this address section */ @@ -402,6 +402,21 @@ function parseIPv6Section(addressSection: string): [number, number] { return [numberValue / 256 | 0, numberValue % 256]; } +/** + * Parse a chunk of an IPv6 address string to some number of bytes + * @param addressChunk Some number of segments of up to 4 hexadecimal + * characters each, joined by colons. + * @returns The list of bytes representing this address chunk + */ +function parseIPv6Chunk(addressChunk: string): number[] { + if (addressChunk === '') { + return []; + } + const bytePairs = addressChunk.split(':').map(section => parseIPv6Section(section)); + const result: number[] = []; + return result.concat(...bytePairs); +} + /** * Converts an IPv4 or IPv6 address from string representation to binary * representation @@ -413,17 +428,17 @@ function ipAddressStringToBuffer(ipAddress: string): Buffer | null { return Buffer.from(Uint8Array.from(ipAddress.split('.').map(segment => Number.parseInt(segment)))); } else if (isIPv6(ipAddress)) { let leftSection: string; - let rightSection: string | null; + let rightSection: string; const doubleColonIndex = ipAddress.indexOf('::'); if (doubleColonIndex === -1) { leftSection = ipAddress; - rightSection = null; + rightSection = ''; } else { leftSection = ipAddress.substring(0, doubleColonIndex); rightSection = ipAddress.substring(doubleColonIndex + 2); } - const leftBuffer = Buffer.from(leftSection.split(':').map(segment => parseIPv6Section(segment)).flat()); - const rightBuffer = rightSection ? Buffer.from(rightSection.split(':').map(segment => parseIPv6Section(segment)).flat()) : Buffer.alloc(0); + const leftBuffer = Buffer.from(parseIPv6Chunk(leftSection)); + const rightBuffer = Buffer.from(parseIPv6Chunk(rightSection)); const middleBuffer = Buffer.alloc(16 - leftBuffer.length - rightBuffer.length, 0); return Buffer.concat([leftBuffer, middleBuffer, rightBuffer]); } else {