From c0f7afec327ab2523bd9970a280da7f49c5fccce Mon Sep 17 00:00:00 2001 From: Kelvin Jin Date: Thu, 12 Apr 2018 11:59:34 -0700 Subject: [PATCH] js: only listen for channel connect event once --- packages/grpc-js-core/src/call.ts | 2 +- packages/grpc-js-core/src/channel.ts | 40 +++++++++++++++++++++++----- packages/grpc-js-core/src/client.ts | 6 +++++ 3 files changed, 40 insertions(+), 8 deletions(-) diff --git a/packages/grpc-js-core/src/call.ts b/packages/grpc-js-core/src/call.ts index af305b24..4344dfeb 100644 --- a/packages/grpc-js-core/src/call.ts +++ b/packages/grpc-js-core/src/call.ts @@ -87,13 +87,13 @@ function setUpReadableStream( stream.push(null); }); call.on('status', (status: StatusObject) => { - stream.emit('status', status); if (status.code !== Status.OK) { const statusName = _.invert(Status)[status.code]; const message: string = `${status.code} ${statusName}: ${status.details}`; const error: ServiceError = Object.assign(new Error(status.details), status); stream.emit('error', error); } + stream.emit('status', status); }); call.pause(); } diff --git a/packages/grpc-js-core/src/channel.ts b/packages/grpc-js-core/src/channel.ts index 8e8e38e7..e710b733 100644 --- a/packages/grpc-js-core/src/channel.ts +++ b/packages/grpc-js-core/src/channel.ts @@ -85,6 +85,8 @@ export class Http2Channel extends EventEmitter implements Channel { private readonly target: url.URL; private readonly defaultAuthority: string; private connectivityState: ConnectivityState = ConnectivityState.IDLE; + // Helper Promise object only used in the implementation of connect(). + private connecting: Promise|null = null; /* For now, we have up to one subchannel, which will exist as long as we are * connecting or trying to connect */ private subChannel: http2.ClientHttp2Session|null = null; @@ -127,6 +129,7 @@ export class Http2Channel extends EventEmitter implements Channel { this.subChannel.removeListener('connect', this.subChannelConnectCallback); this.subChannel.removeListener('close', this.subChannelCloseCallback); this.subChannel = null; + this.emit('shutdown'); clearTimeout(this.backoffTimerId); } break; @@ -279,15 +282,38 @@ export class Http2Channel extends EventEmitter implements Channel { return stream; } + /** + * Attempts to connect, returning a Promise that resolves when the connection + * is successful, or rejects if the channel is shut down. + */ connect(): Promise { - return new Promise((resolve) => { - this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING); - if (this.connectivityState === ConnectivityState.READY) { - setImmediate(resolve); - } else { - this.once('connect', resolve); + if (this.connectivityState === ConnectivityState.READY) { + return Promise.resolve(); + } else if (this.connectivityState === ConnectivityState.SHUTDOWN) { + return Promise.reject(new Error('Channel has been shut down')); + } else { + // In effect, this.connecting is only assigned upon the first attempt to + // transition from IDLE to CONNECTING, so this condition could have also + // been (connectivityState === IDLE). + if (!this.connecting) { + this.connecting = new Promise((resolve, reject) => { + this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING); + const onConnect = () => { + this.connecting = null; + this.removeListener('shutdown', onShutdown); + resolve(); + }; + const onShutdown = () => { + this.connecting = null; + this.removeListener('connect', onConnect); + reject(new Error('Channel has been shut down')); + }; + this.once('connect', onConnect); + this.once('shutdown', onShutdown); + }); } - }); + return this.connecting; + } } getConnectivityState(): ConnectivityState { diff --git a/packages/grpc-js-core/src/client.ts b/packages/grpc-js-core/src/client.ts index 25fa7bc8..b120fcc7 100644 --- a/packages/grpc-js-core/src/client.ts +++ b/packages/grpc-js-core/src/client.ts @@ -42,6 +42,12 @@ export class Client { clearTimeout(timer); } cb(null); + }, (err: Error) => { + // Rejection occurs if channel is shut down first. + if (timer) { + clearTimeout(timer); + } + cb(err); }); if (deadline !== Infinity) { let timeout: number;