diff --git a/doc/environment_variables.md b/doc/environment_variables.md index f2a32294..0237dff7 100644 --- a/doc/environment_variables.md +++ b/doc/environment_variables.md @@ -36,7 +36,10 @@ can be set. - `server` - Traces high-level server events - `server_call` - Traces server handling of individual requests - `subchannel` - Traces subchannel connectivity state and errors - - `subchannel_refcount` - Traces subchannel refcount changes + - `subchannel_refcount` - Traces subchannel refcount changes. Includes per-call logs. + - `subchannel_flowctrl` - Traces HTTP/2 flow control. Includes per-call logs. + - `subchannel_internals` - Traces HTTP/2 session state. Includes per-call logs. + - `channel_stacktrace` - Traces channel construction events with stack traces. The following tracers are added by the `@grpc/grpc-js-xds` library: - `cds_balancer` - Traces the CDS load balancing policy diff --git a/packages/grpc-js-xds/README.md b/packages/grpc-js-xds/README.md index 84d29150..1ac235c4 100644 --- a/packages/grpc-js-xds/README.md +++ b/packages/grpc-js-xds/README.md @@ -26,4 +26,5 @@ const client = new MyServiceClient('xds:///example.com:123'); - [xDS v3 API](https://github.com/grpc/proposal/blob/master/A30-xds-v3.md) - [xDS Timeouts](https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md) - [xDS Circuit Breaking](https://github.com/grpc/proposal/blob/master/A32-xds-circuit-breaking.md) - - [xDS Client-Side Fault Injection](https://github.com/grpc/proposal/blob/master/A33-Fault-Injection.md) \ No newline at end of file + - [xDS Client-Side Fault Injection](https://github.com/grpc/proposal/blob/master/A33-Fault-Injection.md) + - [Client Status Discovery Service](https://github.com/grpc/proposal/blob/master/A40-csds-support.md) \ No newline at end of file diff --git a/packages/grpc-js-xds/package.json b/packages/grpc-js-xds/package.json index 3f1ddb5e..b4e0a49d 100644 --- a/packages/grpc-js-xds/package.json +++ b/packages/grpc-js-xds/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js-xds", - "version": "1.4.0", + "version": "1.5.2", "description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.", "main": "build/src/index.js", "scripts": { @@ -47,7 +47,7 @@ "re2-wasm": "^1.0.1" }, "peerDependencies": { - "@grpc/grpc-js": "~1.4.0" + "@grpc/grpc-js": "~1.5.0" }, "engines": { "node": ">=10.10.0" @@ -55,6 +55,7 @@ "files": [ "src/**/*.ts", "build/src/**/*.{js,d.ts,js.map}", + "deps/envoy-api/envoy/admin/v3/**/*.proto", "deps/envoy-api/envoy/api/v2/**/*.proto", "deps/envoy-api/envoy/config/**/*.proto", "deps/envoy-api/envoy/service/**/*.proto", diff --git a/packages/grpc-js/README.md b/packages/grpc-js/README.md index cda4fec3..71ec937e 100644 --- a/packages/grpc-js/README.md +++ b/packages/grpc-js/README.md @@ -56,6 +56,8 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`. - `grpc.max_send_message_length` - `grpc.max_receive_message_length` - `grpc.enable_http_proxy` + - `grpc.default_compression_algorithm` + - `grpc.enable_channelz` - `grpc-node.max_session_memory` - `channelOverride` - `channelFactoryOverride` diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 510eac61..50896e96 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.4.6", + "version": "1.5.10", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index f2e045ae..4eff39eb 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -312,6 +312,13 @@ export class Http2CallStream implements Call { const filteredStatus = this.filterStack.receiveTrailers( this.finalStatus! ); + this.trace( + 'ended with status: code=' + + filteredStatus.code + + ' details="' + + filteredStatus.details + + '"' + ); this.statusWatchers.forEach(watcher => watcher(filteredStatus)); /* We delay the actual action of bubbling up the status to insulate the * cleanup code in this class from any errors that may be thrown in the @@ -346,13 +353,6 @@ export class Http2CallStream implements Call { /* If the status is OK and a new status comes in (e.g. from a * deserialization failure), that new status takes priority */ if (this.finalStatus === null || this.finalStatus.code === Status.OK) { - this.trace( - 'ended with status: code=' + - status.code + - ' details="' + - status.details + - '"' - ); this.finalStatus = status; this.maybeOutputStatus(); } @@ -795,6 +795,10 @@ export class Http2CallStream implements Call { this.filterStack.push(extraFilters); } + getCallNumber() { + return this.callNumber; + } + startRead() { /* If the stream has ended with an error, we should not emit any more * messages and we should communicate that the stream has ended */ diff --git a/packages/grpc-js/src/channel-options.ts b/packages/grpc-js/src/channel-options.ts index e1c16d12..68831722 100644 --- a/packages/grpc-js/src/channel-options.ts +++ b/packages/grpc-js/src/channel-options.ts @@ -36,6 +36,9 @@ export interface ChannelOptions { 'grpc.max_send_message_length'?: number; 'grpc.max_receive_message_length'?: number; 'grpc.enable_http_proxy'?: number; + /* http_connect_target and http_connect_creds are used for passing data + * around internally, and should not be documented as public-facing options + */ 'grpc.http_connect_target'?: string; 'grpc.http_connect_creds'?: string; 'grpc.default_compression_algorithm'?: CompressionAlgorithms; diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 8b9275e9..635b52d6 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -336,6 +336,8 @@ export class ChannelImplementation implements Channel { new CompressionFilterFactory(this, this.options), ]); this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2)); + const error = new Error(); + trace(LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n')+1)); } private getChannelzInfo(): ChannelInfo { @@ -405,11 +407,16 @@ export class ChannelImplementation implements Channel { metadata: callMetadata, extraPickInfo: callConfig.pickInformation, }); + const subchannelString = pickResult.subchannel ? + '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() : + '' + pickResult.subchannel; this.trace( - 'Pick result: ' + + 'Pick result for call [' + + callStream.getCallNumber() + + ']: ' + PickResultType[pickResult.pickResultType] + ' subchannel: ' + - pickResult.subchannel?.getAddress() + + subchannelString + ' status: ' + pickResult.status?.code + ' ' + @@ -434,7 +441,7 @@ export class ChannelImplementation implements Channel { log( LogVerbosity.ERROR, 'Error: COMPLETE pick result subchannel ' + - pickResult.subchannel!.getAddress() + + subchannelString + ' has state ' + ConnectivityState[pickResult.subchannel!.getConnectivityState()] ); @@ -462,9 +469,9 @@ export class ChannelImplementation implements Channel { callConfig.onCommitted?.(); pickResult.onCallStarted?.(); } catch (error) { - if ( - (error as NodeJS.ErrnoException).code === - 'ERR_HTTP2_GOAWAY_SESSION' + const errorCode = (error as NodeJS.ErrnoException).code; + if (errorCode === 'ERR_HTTP2_GOAWAY_SESSION' || + errorCode === 'ERR_HTTP2_INVALID_SESSION' ) { /* An error here indicates that something went wrong with * the picked subchannel's http2 stream right before we @@ -481,7 +488,7 @@ export class ChannelImplementation implements Channel { * tryPick */ this.trace( 'Failed to start call on picked subchannel ' + - pickResult.subchannel!.getAddress() + + subchannelString + ' with error ' + (error as Error).message + '. Retrying pick', @@ -491,7 +498,7 @@ export class ChannelImplementation implements Channel { } else { this.trace( 'Failed to start call on picked subchanel ' + - pickResult.subchannel!.getAddress() + + subchannelString + ' with error ' + (error as Error).message + '. Ending call', @@ -510,7 +517,7 @@ export class ChannelImplementation implements Channel { * block above */ this.trace( 'Picked subchannel ' + - pickResult.subchannel!.getAddress() + + subchannelString + ' has state ' + ConnectivityState[subchannelState] + ' after metadata filters. Retrying pick', diff --git a/packages/grpc-js/src/load-balancer-child-handler.ts b/packages/grpc-js/src/load-balancer-child-handler.ts index 50708cf9..0591c92f 100644 --- a/packages/grpc-js/src/load-balancer-child-handler.ts +++ b/packages/grpc-js/src/load-balancer-child-handler.ts @@ -125,9 +125,9 @@ export class ChildLoadBalancerHandler implements LoadBalancer { } exitIdle(): void { if (this.currentChild) { - this.currentChild.resetBackoff(); + this.currentChild.exitIdle(); if (this.pendingChild) { - this.pendingChild.resetBackoff(); + this.pendingChild.exitIdle(); } } } diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index c7033f58..884af50b 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -449,11 +449,15 @@ export class PickFirstLoadBalancer implements LoadBalancer { destroy() { this.resetSubchannelList(); if (this.currentPick !== null) { - this.currentPick.unref(); - this.currentPick.removeConnectivityStateListener( + /* Unref can cause a state change, which can cause a change in the value + * of this.currentPick, so we hold a local reference to make sure that + * does not impact this function. */ + const currentPick = this.currentPick; + currentPick.unref(); + currentPick.removeConnectivityStateListener( this.pickedSubchannelStateListener ); - this.channelControlHelper.removeChannelzChild(this.currentPick.getChannelzRef()); + this.channelControlHelper.removeChannelzChild(currentPick.getChannelzRef()); } } diff --git a/packages/grpc-js/src/logging.ts b/packages/grpc-js/src/logging.ts index 549cd860..ec845c1a 100644 --- a/packages/grpc-js/src/logging.ts +++ b/packages/grpc-js/src/logging.ts @@ -108,10 +108,12 @@ export function trace( tracer: string, text: string ): void { - if ( - !disabledTracers.has(tracer) && - (allEnabled || enabledTracers.has(tracer)) - ) { + if (isTracerEnabled(tracer)) { log(severity, new Date().toISOString() + ' | ' + tracer + ' | ' + text); } } + +export function isTracerEnabled(tracer: string): boolean { + return !disabledTracers.has(tracer) && + (allEnabled || enabledTracers.has(tracer)); +} diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index 9077228b..8ad24ed0 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -32,6 +32,7 @@ import { SubchannelAddress, TcpSubchannelAddress } from './subchannel-address'; import { GrpcUri, uriToString, splitHostPort } from './uri-parser'; import { isIPv6, isIPv4 } from 'net'; import { ChannelOptions } from './channel-options'; +import { BackoffOptions, BackoffTimeout } from './backoff-timeout'; const TRACER_NAME = 'dns_resolver'; @@ -85,6 +86,8 @@ class DnsResolver implements Resolver { private latestServiceConfigError: StatusObject | null = null; private percentage: number; private defaultResolutionError: StatusObject; + private backoff: BackoffTimeout; + private continueResolving = false; constructor( private target: GrpcUri, private listener: ResolverListener, @@ -119,6 +122,18 @@ class DnsResolver implements Resolver { details: `Name resolution failed for target ${uriToString(this.target)}`, metadata: new Metadata(), }; + + const backoffOptions: BackoffOptions = { + initialDelay: channelOptions['grpc.initial_reconnect_backoff_ms'], + maxDelay: channelOptions['grpc.max_reconnect_backoff_ms'], + }; + + this.backoff = new BackoffTimeout(() => { + if (this.continueResolving) { + this.startResolutionWithBackoff(); + } + }, backoffOptions); + this.backoff.unref(); } /** @@ -129,6 +144,7 @@ class DnsResolver implements Resolver { if (this.ipResult !== null) { trace('Returning IP address for target ' + uriToString(this.target)); setImmediate(() => { + this.backoff.reset(); this.listener.onSuccessfulResolution( this.ipResult!, null, @@ -140,6 +156,7 @@ class DnsResolver implements Resolver { return; } if (this.dnsHostname === null) { + trace('Failed to parse DNS address ' + uriToString(this.target)); setImmediate(() => { this.listener.onError({ code: Status.UNAVAILABLE, @@ -148,6 +165,7 @@ class DnsResolver implements Resolver { }); }); } else { + trace('Looking up DNS hostname ' + this.dnsHostname); /* We clear out latestLookupResult here to ensure that it contains the * latest result since the last time we started resolving. That way, the * TXT resolution handler can use it, but only if it finishes second. We @@ -164,6 +182,7 @@ class DnsResolver implements Resolver { this.pendingLookupPromise.then( (addressList) => { this.pendingLookupPromise = null; + this.backoff.reset(); const ip4Addresses: dns.LookupAddress[] = addressList.filter( (addr) => addr.family === 4 ); @@ -263,10 +282,21 @@ class DnsResolver implements Resolver { } } - updateResolution() { - trace('Resolution update requested for target ' + uriToString(this.target)); - if (this.pendingLookupPromise === null) { + private startResolutionWithBackoff() { this.startResolution(); + this.backoff.runOnce(); + } + + updateResolution() { + /* If there is a pending lookup, just let it finish. Otherwise, if the + * backoff timer is running, do another lookup when it ends, and if not, + * do another lookup immeidately. */ + if (this.pendingLookupPromise === null) { + if (this.backoff.isRunning()) { + this.continueResolving = true; + } else { + this.startResolutionWithBackoff(); + } } } diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index e0074807..829941fa 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -371,6 +371,13 @@ export class Server { creds._getSettings()! ); http2Server = http2.createSecureServer(secureServerOptions); + http2Server.on('secureConnection', (socket: TLSSocket) => { + /* These errors need to be handled by the user of Http2SecureServer, + * according to https://github.com/nodejs/node/issues/35824 */ + socket.on('error', (e: Error) => { + this.trace('An incoming TLS connection closed with error: ' + e.message); + }); + }); } else { http2Server = http2.createServer(serverOptions); } @@ -423,27 +430,36 @@ export class Server { port: boundAddress.port } } - const channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => { - return { - localAddress: boundSubchannelAddress, - remoteAddress: null, - security: null, - remoteName: null, - streamsStarted: 0, - streamsSucceeded: 0, - streamsFailed: 0, - messagesSent: 0, - messagesReceived: 0, - keepAlivesSent: 0, - lastLocalStreamCreatedTimestamp: null, - lastRemoteStreamCreatedTimestamp: null, - lastMessageSentTimestamp: null, - lastMessageReceivedTimestamp: null, - localFlowControlWindow: null, - remoteFlowControlWindow: null + let channelzRef: SocketRef; + if (this.channelzEnabled) { + channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => { + return { + localAddress: boundSubchannelAddress, + remoteAddress: null, + security: null, + remoteName: null, + streamsStarted: 0, + streamsSucceeded: 0, + streamsFailed: 0, + messagesSent: 0, + messagesReceived: 0, + keepAlivesSent: 0, + lastLocalStreamCreatedTimestamp: null, + lastRemoteStreamCreatedTimestamp: null, + lastMessageSentTimestamp: null, + lastMessageReceivedTimestamp: null, + localFlowControlWindow: null, + remoteFlowControlWindow: null + }; + }); + this.listenerChildrenTracker.refChild(channelzRef); + } else { + channelzRef = { + kind: 'socket', + id: -1, + name: '' }; - }); - this.listenerChildrenTracker.refChild(channelzRef); + } this.http2ServerList.push({server: http2Server, channelzRef: channelzRef}); this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress)); resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum); @@ -492,27 +508,36 @@ export class Server { host: boundAddress.address, port: boundAddress.port }; - const channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => { - return { - localAddress: boundSubchannelAddress, - remoteAddress: null, - security: null, - remoteName: null, - streamsStarted: 0, - streamsSucceeded: 0, - streamsFailed: 0, - messagesSent: 0, - messagesReceived: 0, - keepAlivesSent: 0, - lastLocalStreamCreatedTimestamp: null, - lastRemoteStreamCreatedTimestamp: null, - lastMessageSentTimestamp: null, - lastMessageReceivedTimestamp: null, - localFlowControlWindow: null, - remoteFlowControlWindow: null + let channelzRef: SocketRef; + if (this.channelzEnabled) { + channelzRef = registerChannelzSocket(subchannelAddressToString(boundSubchannelAddress), () => { + return { + localAddress: boundSubchannelAddress, + remoteAddress: null, + security: null, + remoteName: null, + streamsStarted: 0, + streamsSucceeded: 0, + streamsFailed: 0, + messagesSent: 0, + messagesReceived: 0, + keepAlivesSent: 0, + lastLocalStreamCreatedTimestamp: null, + lastRemoteStreamCreatedTimestamp: null, + lastMessageSentTimestamp: null, + lastMessageReceivedTimestamp: null, + localFlowControlWindow: null, + remoteFlowControlWindow: null + }; + }); + this.listenerChildrenTracker.refChild(channelzRef); + } else { + channelzRef = { + kind: 'socket', + id: -1, + name: '' }; - }); - this.listenerChildrenTracker.refChild(channelzRef); + } this.http2ServerList.push({server: http2Server, channelzRef: channelzRef}); this.trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress)); resolve( @@ -592,8 +617,10 @@ export class Server { for (const {server: http2Server, channelzRef: ref} of this.http2ServerList) { if (http2Server.listening) { http2Server.close(() => { - this.listenerChildrenTracker.unrefChild(ref); - unregisterChannelzRef(ref); + if (this.channelzEnabled) { + this.listenerChildrenTracker.unrefChild(ref); + unregisterChannelzRef(ref); + } }); } } @@ -609,7 +636,9 @@ export class Server { session.destroy(http2.constants.NGHTTP2_CANCEL as any); }); this.sessions.clear(); - unregisterChannelzRef(this.channelzRef); + if (this.channelzEnabled) { + unregisterChannelzRef(this.channelzRef); + } } register( @@ -658,7 +687,9 @@ export class Server { tryShutdown(callback: (error?: Error) => void): void { const wrappedCallback = (error?: Error) => { - unregisterChannelzRef(this.channelzRef); + if (this.channelzEnabled) { + unregisterChannelzRef(this.channelzRef); + } callback(error); }; let pendingChecks = 0; @@ -678,8 +709,10 @@ export class Server { if (http2Server.listening) { pendingChecks++; http2Server.close(() => { - this.listenerChildrenTracker.unrefChild(ref); - unregisterChannelzRef(ref); + if (this.channelzEnabled) { + this.listenerChildrenTracker.unrefChild(ref); + unregisterChannelzRef(ref); + } maybeCallback(); }); } @@ -720,8 +753,10 @@ export class Server { 'stream', (stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => { const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session); - this.callTracker.addCallStarted(); - channelzSessionInfo?.streamTracker.addCallStarted(); + if (this.channelzEnabled) { + this.callTracker.addCallStarted(); + channelzSessionInfo?.streamTracker.addCallStarted(); + } const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE]; if ( @@ -736,7 +771,9 @@ export class Server { { endStream: true } ); this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed(); + if (this.channelzEnabled) { + channelzSessionInfo?.streamTracker.addCallFailed(); + } return; } @@ -779,7 +816,7 @@ export class Server { this.callTracker.addCallFailed(); } }); - if (channelzSessionInfo) { + if (this.channelzEnabled && channelzSessionInfo) { call.once('streamEnd', (success: boolean) => { if (success) { channelzSessionInfo.streamTracker.addCallSucceeded(); @@ -834,8 +871,10 @@ export class Server { } catch (err) { if (!call) { call = new Http2ServerCallStream(stream, null!, this.options); - this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed() + if (this.channelzEnabled) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed() + } } if (err.code === undefined) { @@ -853,7 +892,16 @@ export class Server { return; } - const channelzRef = registerChannelzSocket(session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session)); + let channelzRef: SocketRef; + if (this.channelzEnabled) { + channelzRef = registerChannelzSocket(session.socket.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session)); + } else { + channelzRef = { + kind: 'socket', + id: -1, + name: '' + } + } const channelzSessionInfo: ChannelzSessionInfo = { ref: channelzRef, diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 3a1f7048..800274f9 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -42,6 +42,7 @@ import { ConnectivityStateListener } from './subchannel-interface'; const clientVersion = require('../../package.json').version; const TRACER_NAME = 'subchannel'; +const FLOW_CONTROL_TRACER_NAME = 'subchannel_flowctrl'; const MIN_CONNECT_TIMEOUT_MS = 20000; const INITIAL_BACKOFF_MS = 1000; @@ -319,6 +320,14 @@ export class Subchannel { logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); } + private flowControlTrace(text: string): void { + logging.trace(LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); + } + + private internalsTrace(text: string): void { + logging.trace(LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); + } + private handleBackoffTimer() { if (this.continueConnecting) { this.transitionToState( @@ -550,6 +559,24 @@ export class Subchannel { (error as Error).message ); }); + if (logging.isTracerEnabled(TRACER_NAME)) { + session.on('remoteSettings', (settings: http2.Settings) => { + this.trace( + 'new settings received' + + (this.session !== session ? ' on the old connection' : '') + + ': ' + + JSON.stringify(settings) + ); + }); + session.on('localSettings', (settings: http2.Settings) => { + this.trace( + 'local settings acknowledged by remote' + + (this.session !== session ? ' on the old connection' : '') + + ': ' + + JSON.stringify(settings) + ); + }); + } } private startConnectingInternal() { @@ -637,9 +664,15 @@ export class Subchannel { switch (newState) { case ConnectivityState.READY: this.stopBackoff(); - this.session!.socket.once('close', () => { - for (const listener of this.disconnectListeners) { - listener(); + const session = this.session!; + session.socket.once('close', () => { + if (this.session === session) { + this.transitionToState( + [ConnectivityState.READY], + ConnectivityState.TRANSIENT_FAILURE); + for (const listener of this.disconnectListeners) { + listener(); + } } }); if (this.keepaliveWithoutCalls) { @@ -819,13 +852,26 @@ export class Subchannel { logging.trace( LogVerbosity.DEBUG, 'call_stream', - 'Starting stream on subchannel ' + + 'Starting stream [' + callStream.getCallNumber() + '] on subchannel ' + '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' with headers\n' + headersString ); + this.flowControlTrace( + 'local window size: ' + + this.session!.state.localWindowSize + + ' remote window size: ' + + this.session!.state.remoteWindowSize + ); const streamSession = this.session; + this.internalsTrace( + 'session.closed=' + + streamSession!.closed + + ' session.destroyed=' + + streamSession!.destroyed + + ' session.socket.destroyed=' + + streamSession!.socket.destroyed); let statsTracker: SubchannelCallStatsTracker; if (this.channelzEnabled) { this.callTracker.addCallStarted(); diff --git a/packages/grpc-js/test/test-local-subchannel-pool.ts b/packages/grpc-js/test/test-local-subchannel-pool.ts new file mode 100644 index 00000000..081b2d3d --- /dev/null +++ b/packages/grpc-js/test/test-local-subchannel-pool.ts @@ -0,0 +1,73 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as assert from 'assert'; +import * as path from 'path'; +import * as grpc from '../src'; +import { sendUnaryData, Server, ServerCredentials, ServerUnaryCall, ServiceClientConstructor, ServiceError } from "../src"; +import { loadProtoFile } from './common'; + +const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); +const echoService = loadProtoFile(protoFile) + .EchoService as ServiceClientConstructor; + +describe('Local subchannel pool', () => { + let server: Server; + let serverPort: number; + + before(done => { + + server = new Server(); + server.addService(echoService.service, { + echo(call: ServerUnaryCall, callback: sendUnaryData) { + callback(null, call.request); + }, + }); + + server.bindAsync( + 'localhost:0', + ServerCredentials.createInsecure(), + (err, port) => { + assert.ifError(err); + serverPort = port; + server.start(); + done(); + } + ); + }); + + after(done => { + server.tryShutdown(done); + }); + + it('should complete the client lifecycle without error', done => { + const client = new echoService( + `localhost:${serverPort}`, + grpc.credentials.createInsecure(), + {'grpc.use_local_subchannel_pool': 1} + ); + client.echo( + { value: 'test value', value2: 3 }, + (error: ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, { value: 'test value', value2: 3 }); + client.close(); + done(); + } + ); + }); +}); \ No newline at end of file diff --git a/run-tests.sh b/run-tests.sh index e62cbd88..4bcb388b 100755 --- a/run-tests.sh +++ b/run-tests.sh @@ -75,6 +75,8 @@ do # npm test calls nyc gulp test npm test || FAILED="true" + + ./test/distrib/run-distrib-test.sh || FAILED="true" done set +ex diff --git a/test/distrib/distrib-test.js b/test/distrib/distrib-test.js new file mode 100644 index 00000000..708c93eb --- /dev/null +++ b/test/distrib/distrib-test.js @@ -0,0 +1,22 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +const grpcJs = require('@grpc/grpc-js'); + +const grpcJsXds = require('@grpc/grpc-js-xds'); + +const protoLoader = require('@grpc/proto-loader'); diff --git a/test/distrib/run-distrib-test.sh b/test/distrib/run-distrib-test.sh new file mode 100755 index 00000000..e2ed0853 --- /dev/null +++ b/test/distrib/run-distrib-test.sh @@ -0,0 +1,33 @@ +#!/bin/bash +# Copyright 2022 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -ex + +cd $(dirname $0) +base=$(pwd) + +cd ../../packages/grpc-js +npm pack +cd ../grpc-js-xds +npm pack +cd ../proto-loader +npm pack + +cd $base +npm install ../../packages/grpc-js/grpc-grpc-js-*.tgz +npm install ../../packages/grpc-js-xds/grpc-grpc-js-xds-*.tgz +npm install ../../packages/proto-loader/grpc-proto-loader-*.tgz + +node ./distrib-test.js