diff --git a/packages/grpc-js-core/gulpfile.ts b/packages/grpc-js-core/gulpfile.ts index b75350cc..72434d5b 100644 --- a/packages/grpc-js-core/gulpfile.ts +++ b/packages/grpc-js-core/gulpfile.ts @@ -71,7 +71,7 @@ gulp.task('copy-test-fixtures', 'Copy test fixtures.', () => { /** * Transpiles src/ and test/, and then runs all tests. */ -gulp.task('test', 'Runs all tests.', ['copy-test-fixtures'], () => { +gulp.task('test', 'Runs all tests.', ['lint', 'copy-test-fixtures'], () => { if (semver.satisfies(process.version, '^8.11.2 || >=9.4')) { return gulp.src(`${outDir}/test/**/*.js`) .pipe(mocha({reporter: 'mocha-jenkins-reporter', diff --git a/packages/grpc-js-core/src/call-credentials-filter.ts b/packages/grpc-js-core/src/call-credentials-filter.ts index c13df869..95aee592 100644 --- a/packages/grpc-js-core/src/call-credentials-filter.ts +++ b/packages/grpc-js-core/src/call-credentials-filter.ts @@ -1,5 +1,3 @@ -import {promisify} from 'util'; - import {CallCredentials} from './call-credentials'; import {Call} from './call-stream'; import {Http2Channel} from './channel'; diff --git a/packages/grpc-js-core/src/call-credentials.ts b/packages/grpc-js-core/src/call-credentials.ts index 7d5bd332..2a9d8195 100644 --- a/packages/grpc-js-core/src/call-credentials.ts +++ b/packages/grpc-js-core/src/call-credentials.ts @@ -1,4 +1,4 @@ -import {map, reduce} from 'lodash'; +import {map} from 'lodash'; import {Metadata} from './metadata'; diff --git a/packages/grpc-js-core/src/call-stream.ts b/packages/grpc-js-core/src/call-stream.ts index 87b38c45..20eca023 100644 --- a/packages/grpc-js-core/src/call-stream.ts +++ b/packages/grpc-js-core/src/call-stream.ts @@ -2,14 +2,13 @@ import * as http2 from 'http2'; import {Duplex} from 'stream'; import {CallCredentials} from './call-credentials'; +import {Http2Channel} from './channel'; import {Status} from './constants'; import {EmitterAugmentation1} from './events'; import {Filter} from './filter'; import {FilterStackFactory} from './filter-stack'; import {Metadata} from './metadata'; import {ObjectDuplex, WriteCallback} from './object-stream'; -import { Meta } from 'orchestrator'; -import { Channel, Http2Channel } from './channel'; const {HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL} = http2.constants; @@ -20,7 +19,7 @@ export interface CallStreamOptions { deadline: Deadline; flags: number; host: string; - parentCall: Call | null; + parentCall: Call|null; } export type PartialCallStreamOptions = Partial; @@ -67,16 +66,13 @@ enum ReadState { READING_MESSAGE } -const emptyBuffer = Buffer.alloc(0); - export class Http2CallStream extends Duplex implements Call { credentials: CallCredentials = CallCredentials.createEmpty(); filterStack: Filter; - private statusEmitted = false; private http2Stream: http2.ClientHttp2Stream|null = null; private pendingRead = false; private pendingWrite: Buffer|null = null; - private pendingWriteCallback: WriteCallback | null = null; + private pendingWriteCallback: WriteCallback|null = null; private pendingFinalCallback: Function|null = null; private readState: ReadState = ReadState.NO_DATA; @@ -384,7 +380,8 @@ export class Http2CallStream extends Duplex implements Call { } sendMetadata(metadata: Metadata): void { - this.channel._startHttp2Stream(this.options.host, this.methodName, this, metadata); + this.channel._startHttp2Stream( + this.options.host, this.methodName, this, metadata); } private destroyHttp2Stream() { diff --git a/packages/grpc-js-core/src/call.ts b/packages/grpc-js-core/src/call.ts index 9c864d8c..6e6f82aa 100644 --- a/packages/grpc-js-core/src/call.ts +++ b/packages/grpc-js-core/src/call.ts @@ -1,5 +1,4 @@ import {EventEmitter} from 'events'; -import * as _ from 'lodash'; import {Duplex, Readable, Writable} from 'stream'; import {Call, StatusObject, WriteObject} from './call-stream'; @@ -87,8 +86,6 @@ function setUpReadableStream( }); call.on('status', (status: StatusObject) => { if (status.code !== Status.OK) { - const statusName = _.invert(Status)[status.code]; - const message = `${status.code} ${statusName}: ${status.details}`; const error: ServiceError = Object.assign(new Error(status.details), status); stream.emit('error', error); @@ -124,8 +121,8 @@ export class ClientReadableStreamImpl extends Readable implements } function tryWrite( - call: Call, serialize: (value: RequestType) => Buffer, - chunk: RequestType, encoding: string, cb: Function) { + call: Call, serialize: (value: RequestType) => Buffer, chunk: RequestType, + encoding: string, cb: Function) { let message: Buffer; const flags: number = Number(encoding); try { diff --git a/packages/grpc-js-core/src/channel-credentials.ts b/packages/grpc-js-core/src/channel-credentials.ts index cb29fa64..b2c73efa 100644 --- a/packages/grpc-js-core/src/channel-credentials.ts +++ b/packages/grpc-js-core/src/channel-credentials.ts @@ -1,4 +1,4 @@ -import {createSecureContext, SecureContext, TLSSocket, ConnectionOptions, PeerCertificate} from 'tls'; +import {ConnectionOptions, createSecureContext, PeerCertificate} from 'tls'; import {CallCredentials} from './call-credentials'; @@ -25,7 +25,8 @@ export interface Certificate { * indicate that the presented certificate is considered invalid and * otherwise returned undefined. */ -export type CheckServerIdentityCallback = (hostname: string, cert: Certificate) => Error | undefined; +export type CheckServerIdentityCallback = + (hostname: string, cert: Certificate) => Error|undefined; /** * Additional peer verification options that can be set when creating @@ -87,7 +88,8 @@ export abstract class ChannelCredentials { */ static createSsl( rootCerts?: Buffer|null, privateKey?: Buffer|null, - certChain?: Buffer|null, verifyOptions?: VerifyOptions): ChannelCredentials { + certChain?: Buffer|null, + verifyOptions?: VerifyOptions): ChannelCredentials { verifyIsBufferOrNull(rootCerts, 'Root certificate'); verifyIsBufferOrNull(privateKey, 'Private key'); verifyIsBufferOrNull(certChain, 'Certificate chain'); @@ -104,13 +106,12 @@ export abstract class ChannelCredentials { key: privateKey || undefined, cert: certChain || undefined }); - let connectionOptions: ConnectionOptions = { - secureContext - }; + const connectionOptions: ConnectionOptions = {secureContext}; if (verifyOptions && verifyOptions.checkServerIdentity) { - connectionOptions.checkServerIdentity = (host: string, cert: PeerCertificate) => { - return verifyOptions.checkServerIdentity!(host, {raw: cert.raw}); - } + connectionOptions.checkServerIdentity = + (host: string, cert: PeerCertificate) => { + return verifyOptions.checkServerIdentity!(host, {raw: cert.raw}); + }; } return new SecureChannelCredentialsImpl(connectionOptions); } @@ -141,9 +142,10 @@ class InsecureChannelCredentialsImpl extends ChannelCredentials { } class SecureChannelCredentialsImpl extends ChannelCredentials { - connectionOptions: ConnectionOptions + connectionOptions: ConnectionOptions; - constructor(connectionOptions: ConnectionOptions, callCredentials?: CallCredentials) { + constructor( + connectionOptions: ConnectionOptions, callCredentials?: CallCredentials) { super(callCredentials); this.connectionOptions = connectionOptions; } diff --git a/packages/grpc-js-core/src/channel.ts b/packages/grpc-js-core/src/channel.ts index 41b52562..aeb7d765 100644 --- a/packages/grpc-js-core/src/channel.ts +++ b/packages/grpc-js-core/src/channel.ts @@ -1,25 +1,22 @@ import {EventEmitter} from 'events'; import * as http2 from 'http2'; -import {checkServerIdentity, PeerCertificate, SecureContext} from 'tls'; +import {checkServerIdentity, PeerCertificate} from 'tls'; import * as url from 'url'; -import {CallCredentials} from './call-credentials'; import {CallCredentialsFilterFactory} from './call-credentials-filter'; -import {PartialCallStreamOptions, Call, CallStreamOptions, Http2CallStream, Deadline} from './call-stream'; +import {Call, CallStreamOptions, Deadline, Http2CallStream} from './call-stream'; import {ChannelCredentials} from './channel-credentials'; +import {ChannelOptions, recognizedOptions} from './channel-options'; import {CompressionFilterFactory} from './compression-filter'; import {Status} from './constants'; import {DeadlineFilterFactory} from './deadline-filter'; import {FilterStackFactory} from './filter-stack'; -import {Metadata, MetadataObject} from './metadata'; +import {Metadata} from './metadata'; import {MetadataStatusFilterFactory} from './metadata-status-filter'; -import { Http2SubChannel } from './subchannel'; -import {ChannelOptions, recognizedOptions} from './channel-options'; +import {Http2SubChannel} from './subchannel'; const {version: clientVersion} = require('../../package'); -const IDLE_TIMEOUT_MS = 300000; - const MIN_CONNECT_TIMEOUT_MS = 20000; const INITIAL_BACKOFF_MS = 1000; const BACKOFF_MULTIPLIER = 1.6; @@ -31,7 +28,6 @@ const { HTTP2_HEADER_CONTENT_TYPE, HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, - HTTP2_HEADER_SCHEME, HTTP2_HEADER_TE, HTTP2_HEADER_USER_AGENT } = http2.constants; @@ -57,7 +53,8 @@ function uniformRandom(min: number, max: number) { */ export interface Channel { /** - * Close the channel. This has the same functionality as the existing grpc.Client.prototype.close + * Close the channel. This has the same functionality as the existing + * grpc.Client.prototype.close */ close(): void; /** @@ -83,7 +80,9 @@ export interface Channel { * @param callback Called with no error when a state change, or with an * error if the deadline passes without a state change. */ - watchConnectivityState(currentState: ConnectivityState, deadline: Date|number, callback: (error?: Error) => void): void; + watchConnectivityState( + currentState: ConnectivityState, deadline: Date|number, + callback: (error?: Error) => void): void; /** * Create a call object. Call is an opaque type that is used by the Client * class. This function is called by the gRPC library when starting a @@ -96,7 +95,10 @@ export interface Channel { * @param propagateFlags A bitwise combination of elements of grpc.propagate * that indicates what information to propagate from parentCall. */ - createCall(method: string, deadline: Deadline|null|undefined, host: string|null|undefined, parentCall: Call|null|undefined, propagateFlags: number|null|undefined): Call; + createCall( + method: string, deadline: Deadline|null|undefined, + host: string|null|undefined, parentCall: Call|null|undefined, + propagateFlags: number|null|undefined): Call; } export class Http2Channel extends EventEmitter implements Channel { @@ -177,7 +179,8 @@ export class Http2Channel extends EventEmitter implements Channel { } private startConnecting(): void { - let connectionOptions: http2.SecureClientSessionOptions = this.credentials._getConnectionOptions() || {}; + const connectionOptions: http2.SecureClientSessionOptions = + this.credentials._getConnectionOptions() || {}; if (connectionOptions.secureContext !== null) { // If provided, the value of grpc.ssl_target_name_override should be used // to override the target hostname when checking server identity. @@ -192,7 +195,8 @@ export class Http2Channel extends EventEmitter implements Channel { connectionOptions.servername = sslTargetNameOverride; } } - const subChannel: Http2SubChannel = new Http2SubChannel(this.target, connectionOptions, this.userAgent, this.options); + const subChannel: Http2SubChannel = new Http2SubChannel( + this.target, connectionOptions, this.userAgent, this.options); this.subChannel = subChannel; const now = new Date(); const connectionTimeout: number = Math.max( @@ -226,10 +230,11 @@ export class Http2Channel extends EventEmitter implements Channel { address: string, readonly credentials: ChannelCredentials, private readonly options: Partial) { super(); - for (let option in options) { + for (const option in options) { if (options.hasOwnProperty(option)) { if (!recognizedOptions.hasOwnProperty(option)) { - console.warn(`Unrecognized channel argument '${option}' will be ignored.`); + console.warn( + `Unrecognized channel argument '${option}' will be ignored.`); } } } @@ -294,19 +299,22 @@ export class Http2Channel extends EventEmitter implements Channel { }); } - createCall(method: string, deadline: Deadline|null|undefined, host: string|null|undefined, parentCall: Call|null|undefined, propagateFlags: number|null|undefined): - Call { + createCall( + method: string, deadline: Deadline|null|undefined, + host: string|null|undefined, parentCall: Call|null|undefined, + propagateFlags: number|null|undefined): Call { if (this.connectivityState === ConnectivityState.SHUTDOWN) { throw new Error('Channel has been shut down'); } const finalOptions: CallStreamOptions = { - deadline: (deadline === null || deadline == undefined) ? Infinity : deadline, + deadline: (deadline === null || deadline === undefined) ? Infinity : + deadline, flags: propagateFlags || 0, host: host || this.defaultAuthority, parentCall: parentCall || null }; - const stream: Http2CallStream = - new Http2CallStream(method, this, finalOptions, this.filterStackFactory); + const stream: Http2CallStream = new Http2CallStream( + method, this, finalOptions, this.filterStackFactory); return stream; } @@ -347,18 +355,21 @@ export class Http2Channel extends EventEmitter implements Channel { getConnectivityState(tryToConnect: boolean): ConnectivityState { if (tryToConnect) { - this.transitionToState([ConnectivityState.IDLE], ConnectivityState.CONNECTING); + this.transitionToState( + [ConnectivityState.IDLE], ConnectivityState.CONNECTING); } return this.connectivityState; } - watchConnectivityState(currentState: ConnectivityState, deadline: Date|number, callback: (error?: Error)=>void) { + watchConnectivityState( + currentState: ConnectivityState, deadline: Date|number, + callback: (error?: Error) => void) { if (this.connectivityState !== currentState) { /* If the connectivity state is different from the provided currentState, * we assume that a state change has successfully occurred */ setImmediate(callback); } else { - let deadlineMs: number = 0; + let deadlineMs = 0; if (deadline instanceof Date) { deadlineMs = deadline.getTime(); } else { @@ -368,11 +379,11 @@ export class Http2Channel extends EventEmitter implements Channel { if (timeout < 0) { timeout = 0; } - let timeoutId = setTimeout(() => { + const timeoutId = setTimeout(() => { this.removeListener('connectivityStateChanged', eventCb); callback(new Error('Channel state did not change before deadline')); }, timeout); - let eventCb = () => { + const eventCb = () => { clearTimeout(timeoutId); callback(); }; diff --git a/packages/grpc-js-core/src/client.ts b/packages/grpc-js-core/src/client.ts index 7bfc19bb..7381f4da 100644 --- a/packages/grpc-js-core/src/client.ts +++ b/packages/grpc-js-core/src/client.ts @@ -1,14 +1,11 @@ -import {once} from 'lodash'; -import {URL} from 'url'; - import {ClientDuplexStream, ClientDuplexStreamImpl, ClientReadableStream, ClientReadableStreamImpl, ClientUnaryCall, ClientUnaryCallImpl, ClientWritableStream, ClientWritableStreamImpl, ServiceError} from './call'; -import {PartialCallStreamOptions, Call, StatusObject, WriteObject, Deadline} from './call-stream'; -import {Channel, Http2Channel, ConnectivityState} from './channel'; +import {CallCredentials} from './call-credentials'; +import {Call, Deadline, StatusObject, WriteObject} from './call-stream'; +import {Channel, ConnectivityState, Http2Channel} from './channel'; import {ChannelCredentials} from './channel-credentials'; +import {ChannelOptions} from './channel-options'; import {Status} from './constants'; import {Metadata} from './metadata'; -import {ChannelOptions} from './channel-options'; -import { CallCredentials } from './call-credentials'; // This symbol must be exported (for now). // See: https://github.com/Microsoft/TypeScript/issues/20080 @@ -19,16 +16,17 @@ export interface UnaryCallback { } export interface CallOptions { - deadline?: Deadline, - host?: string, - parent?: Call, - propagate_flags?: number, - credentials?: CallCredentials + deadline?: Deadline; + host?: string; + parent?: Call; + propagate_flags?: number; + credentials?: CallCredentials; } -export type ClientOptions = Partial & { +export type ClientOptions = Partial&{ channelOverride?: Channel, - channelFactoryOverride?: (address: string, credentials: ChannelCredentials, options: ClientOptions) => Channel + channelFactoryOverride?: (address: string, credentials: ChannelCredentials, + options: ClientOptions) => Channel }; /** @@ -43,7 +41,8 @@ export class Client { if (options.channelOverride) { this[kChannel] = options.channelOverride; } else if (options.channelFactoryOverride) { - this[kChannel] = options.channelFactoryOverride(address, credentials, options); + this[kChannel] = + options.channelFactoryOverride(address, credentials, options); } else { this[kChannel] = new Http2Channel(address, credentials, options); } @@ -57,31 +56,30 @@ export class Client { return this[kChannel]; } - waitForReady(deadline: Deadline, callback: (error?: Error) => void): - void { - const checkState = (err?: Error) => { - if (err) { - callback(new Error('Failed to connect before the deadline')); - return; - } - var new_state; + waitForReady(deadline: Deadline, callback: (error?: Error) => void): void { + const checkState = (err?: Error) => { + if (err) { + callback(new Error('Failed to connect before the deadline')); + return; + } + let newState; + try { + newState = this[kChannel].getConnectivityState(true); + } catch (e) { + callback(new Error('The channel has been closed')); + return; + } + if (newState === ConnectivityState.READY) { + callback(); + } else { try { - new_state = this[kChannel].getConnectivityState(true); + this[kChannel].watchConnectivityState(newState, deadline, checkState); } catch (e) { callback(new Error('The channel has been closed')); - return; } - if (new_state === ConnectivityState.READY) { - callback(); - } else { - try { - this[kChannel].watchConnectivityState(new_state, deadline, checkState); - } catch (e) { - callback(new Error('The channel has been closed')); - } - } - }; - setImmediate(checkState); + } + }; + setImmediate(checkState); } private handleUnaryResponse( @@ -172,8 +170,9 @@ export class Client { ({metadata, options, callback} = this.checkOptionalUnaryResponseArguments( metadata, options, callback)); - const call: Call = - this[kChannel].createCall(method, options.deadline, options.host, options.parent, options.propagate_flags); + const call: Call = this[kChannel].createCall( + method, options.deadline, options.host, options.parent, + options.propagate_flags); if (options.credentials) { call.setCredentials(options.credentials); } @@ -213,8 +212,9 @@ export class Client { ({metadata, options, callback} = this.checkOptionalUnaryResponseArguments( metadata, options, callback)); - const call: Call = - this[kChannel].createCall(method, options.deadline, options.host, options.parent, options.propagate_flags); + const call: Call = this[kChannel].createCall( + method, options.deadline, options.host, options.parent, + options.propagate_flags); if (options.credentials) { call.setCredentials(options.credentials); } @@ -261,8 +261,9 @@ export class Client { metadata?: Metadata|CallOptions, options?: CallOptions): ClientReadableStream { ({metadata, options} = this.checkMetadataAndOptions(metadata, options)); - const call: Call = - this[kChannel].createCall(method, options.deadline, options.host, options.parent, options.propagate_flags); + const call: Call = this[kChannel].createCall( + method, options.deadline, options.host, options.parent, + options.propagate_flags); if (options.credentials) { call.setCredentials(options.credentials); } @@ -288,8 +289,9 @@ export class Client { metadata?: Metadata|CallOptions, options?: CallOptions): ClientDuplexStream { ({metadata, options} = this.checkMetadataAndOptions(metadata, options)); - const call: Call = - this[kChannel].createCall(method, options.deadline, options.host, options.parent, options.propagate_flags); + const call: Call = this[kChannel].createCall( + method, options.deadline, options.host, options.parent, + options.propagate_flags); if (options.credentials) { call.setCredentials(options.credentials); } diff --git a/packages/grpc-js-core/src/compression-filter.ts b/packages/grpc-js-core/src/compression-filter.ts index d694f239..e4ed9604 100644 --- a/packages/grpc-js-core/src/compression-filter.ts +++ b/packages/grpc-js-core/src/compression-filter.ts @@ -2,7 +2,6 @@ import * as zlib from 'zlib'; import {Call, WriteFlags, WriteObject} from './call-stream'; import {Channel} from './channel'; -import {Status} from './constants'; import {BaseFilter, Filter, FilterFactory} from './filter'; import {Metadata, MetadataValue} from './metadata'; @@ -19,7 +18,7 @@ abstract class CompressionHandler { if (compress) { messageBuffer = await this.compressMessage(messageBuffer); } - let output = Buffer.allocUnsafe(messageBuffer.length + 5); + const output = Buffer.allocUnsafe(messageBuffer.length + 5); output.writeUInt8(compress ? 1 : 0, 0); output.writeUInt32BE(messageBuffer.length, 1); messageBuffer.copy(output, 5); @@ -45,7 +44,7 @@ class IdentityHandler extends CompressionHandler { } async writeMessage(message: Buffer, compress: boolean): Promise { - let output = Buffer.allocUnsafe(message.length + 5); + const output = Buffer.allocUnsafe(message.length + 5); /* With "identity" compression, messages should always be marked as * uncompressed */ output.writeUInt8(0, 0); @@ -62,49 +61,53 @@ class IdentityHandler extends CompressionHandler { class DeflateHandler extends CompressionHandler { compressMessage(message: Buffer) { - return new Promise( - (resolve, reject) => {zlib.deflate(message, (err, output) => { - if (err) { - reject(err); - } else { - resolve(output); - } - })}); + return new Promise((resolve, reject) => { + zlib.deflate(message, (err, output) => { + if (err) { + reject(err); + } else { + resolve(output); + } + }); + }); } decompressMessage(message: Buffer) { - return new Promise( - (resolve, reject) => {zlib.inflate(message, (err, output) => { - if (err) { - reject(err); - } else { - resolve(output); - } - })}); + return new Promise((resolve, reject) => { + zlib.inflate(message, (err, output) => { + if (err) { + reject(err); + } else { + resolve(output); + } + }); + }); } } class GzipHandler extends CompressionHandler { compressMessage(message: Buffer) { - return new Promise( - (resolve, reject) => {zlib.gzip(message, (err, output) => { - if (err) { - reject(err); - } else { - resolve(output); - } - })}); + return new Promise((resolve, reject) => { + zlib.gzip(message, (err, output) => { + if (err) { + reject(err); + } else { + resolve(output); + } + }); + }); } decompressMessage(message: Buffer) { - return new Promise( - (resolve, reject) => {zlib.unzip(message, (err, output) => { - if (err) { - reject(err); - } else { - resolve(output); - } - })}); + return new Promise((resolve, reject) => { + zlib.unzip(message, (err, output) => { + if (err) { + reject(err); + } else { + resolve(output); + } + }); + }); } } @@ -150,7 +153,7 @@ export class CompressionFilter extends BaseFilter implements Filter { async receiveMetadata(metadata: Promise): Promise { const headers: Metadata = await metadata; - let receiveEncoding: MetadataValue[] = headers.get('grpc-encoding'); + const receiveEncoding: MetadataValue[] = headers.get('grpc-encoding'); if (receiveEncoding.length > 0) { const encoding: MetadataValue = receiveEncoding[0]; if (typeof encoding === 'string') { diff --git a/packages/grpc-js-core/src/deadline-filter.ts b/packages/grpc-js-core/src/deadline-filter.ts index 0e288608..fd52ebbc 100644 --- a/packages/grpc-js-core/src/deadline-filter.ts +++ b/packages/grpc-js-core/src/deadline-filter.ts @@ -1,5 +1,5 @@ import {Call} from './call-stream'; -import {Channel, Http2Channel, ConnectivityState} from './channel'; +import {ConnectivityState, Http2Channel} from './channel'; import {Status} from './constants'; import {BaseFilter, Filter, FilterFactory} from './filter'; import {Metadata} from './metadata'; @@ -51,22 +51,25 @@ export class DeadlineFilter extends BaseFilter implements Filter { return metadata; } return new Promise((resolve, reject) => { - if (this.channel.getConnectivityState(false) === ConnectivityState.READY) { - resolve(metadata); - } else { - const handleStateChange = (newState: ConnectivityState) => { - if (newState === ConnectivityState.READY) { - resolve(metadata); - this.channel.removeListener('connectivityStateChanged', handleStateChange); - } - }; - this.channel.on('connectivityStateChanged', handleStateChange); - } - }).then((finalMetadata: Metadata) => { - const timeoutString = getDeadline(this.deadline); - finalMetadata.set('grpc-timeout', timeoutString); - return finalMetadata; - }); + if (this.channel.getConnectivityState(false) === + ConnectivityState.READY) { + resolve(metadata); + } else { + const handleStateChange = (newState: ConnectivityState) => { + if (newState === ConnectivityState.READY) { + resolve(metadata); + this.channel.removeListener( + 'connectivityStateChanged', handleStateChange); + } + }; + this.channel.on('connectivityStateChanged', handleStateChange); + } + }) + .then((finalMetadata: Metadata) => { + const timeoutString = getDeadline(this.deadline); + finalMetadata.set('grpc-timeout', timeoutString); + return finalMetadata; + }); } } diff --git a/packages/grpc-js-core/src/index.ts b/packages/grpc-js-core/src/index.ts index 79318a6d..f7a22a0d 100644 --- a/packages/grpc-js-core/src/index.ts +++ b/packages/grpc-js-core/src/index.ts @@ -1,16 +1,14 @@ - -import {IncomingHttpHeaders} from 'http'; +import * as semver from 'semver'; import {CallCredentials} from './call-credentials'; +import {Channel} from './channel'; import {ChannelCredentials} from './channel-credentials'; import {Client} from './client'; import {LogVerbosity, Status} from './constants'; +import * as logging from './logging'; import {loadPackageDefinition, makeClientConstructor} from './make-client'; import {Metadata} from './metadata'; -import { Channel } from './channel'; import {StatusBuilder} from './status-builder'; -import * as logging from './logging'; -import * as semver from 'semver'; const supportedNodeVersions = '^8.11.2 || >=9.4'; if (!semver.satisfies(process.version, supportedNodeVersions)) { @@ -18,15 +16,15 @@ if (!semver.satisfies(process.version, supportedNodeVersions)) { } interface IndexedObject { - [key: string]: any; - [key: number]: any; + [key: string]: any; // tslint:disable-line no-any + [key: number]: any; // tslint:disable-line no-any } function mixin(...sources: IndexedObject[]) { const result: {[key: string]: Function} = {}; for (const source of sources) { for (const propName of Object.getOwnPropertyNames(source)) { - const property: any = source[propName]; + const property: any = source[propName]; // tslint:disable-line no-any if (typeof property === 'function') { result[propName] = property; } diff --git a/packages/grpc-js-core/src/logging.ts b/packages/grpc-js-core/src/logging.ts index b5e151c4..a0e4e8c5 100644 --- a/packages/grpc-js-core/src/logging.ts +++ b/packages/grpc-js-core/src/logging.ts @@ -15,6 +15,7 @@ export const setLoggerVerbosity = (verbosity: LogVerbosity): void => { _logVerbosity = verbosity; }; +// tslint:disable-next-line no-any export const log = (severity: LogVerbosity, ...args: any[]): void => { if (severity >= _logVerbosity && typeof _logger.error === 'function') { _logger.error(...args); diff --git a/packages/grpc-js-core/src/make-client.ts b/packages/grpc-js-core/src/make-client.ts index fff339f4..669723d0 100644 --- a/packages/grpc-js-core/src/make-client.ts +++ b/packages/grpc-js-core/src/make-client.ts @@ -1,18 +1,12 @@ import * as _ from 'lodash'; -import {PartialCallStreamOptions} from './call-stream'; -import {ChannelOptions} from './channel-options'; import {ChannelCredentials} from './channel-credentials'; -import {Client, UnaryCallback} from './client'; -import {Metadata} from './metadata'; +import {ChannelOptions} from './channel-options'; +import {Client} from './client'; -export interface Serialize { - (value: T): Buffer; -} +export interface Serialize { (value: T): Buffer; } -export interface Deserialize { - (bytes: Buffer): T; -} +export interface Deserialize { (bytes: Buffer): T; } export interface MethodDefinition { path: string; @@ -29,14 +23,7 @@ export interface ServiceDefinition { [index: string]: MethodDefinition; } -export interface PackageDefinition { - [index: string]: ServiceDefinition; -} - -function getDefaultValues(metadata?: Metadata, options?: T): - {metadata: Metadata; options: Partial;} { - return {metadata: metadata || new Metadata(), options: options || {}}; -} +export interface PackageDefinition { [index: string]: ServiceDefinition; } /** * Map with short names for each of the requester maker functions. Used in diff --git a/packages/grpc-js-core/src/metadata.ts b/packages/grpc-js-core/src/metadata.ts index e53b18a9..48cfb6f3 100644 --- a/packages/grpc-js-core/src/metadata.ts +++ b/packages/grpc-js-core/src/metadata.ts @@ -5,9 +5,7 @@ const LEGAL_NON_BINARY_VALUE_REGEX = /^[ -~]*$/; export type MetadataValue = string|Buffer; -export interface MetadataObject { - [key: string]: MetadataValue[]; -} +export interface MetadataObject { [key: string]: MetadataValue[]; } function cloneMetadataObject(repr: MetadataObject): MetadataObject { const result: MetadataObject = {}; diff --git a/packages/grpc-js-core/src/object-stream.ts b/packages/grpc-js-core/src/object-stream.ts index dca37e79..b298ed02 100644 --- a/packages/grpc-js-core/src/object-stream.ts +++ b/packages/grpc-js-core/src/object-stream.ts @@ -3,7 +3,7 @@ import {EmitterAugmentation1} from './events'; // tslint:disable:no-any -export type WriteCallback = (error: Error | null | undefined) => void; +export type WriteCallback = (error: Error|null|undefined) => void; export interface IntermediateObjectReadable extends Readable { read(size?: number): any&T; diff --git a/packages/grpc-js-core/src/status-builder.ts b/packages/grpc-js-core/src/status-builder.ts index 0ba79bb5..dd0859a0 100644 --- a/packages/grpc-js-core/src/status-builder.ts +++ b/packages/grpc-js-core/src/status-builder.ts @@ -13,7 +13,7 @@ export class StatusBuilder { constructor() { this.code = null; this.details = null; - this.metadata = null; + this.metadata = null; } /** diff --git a/packages/grpc-js-core/src/subchannel.ts b/packages/grpc-js-core/src/subchannel.ts index f793aebb..13612a88 100644 --- a/packages/grpc-js-core/src/subchannel.ts +++ b/packages/grpc-js-core/src/subchannel.ts @@ -1,18 +1,16 @@ +import {EventEmitter} from 'events'; import * as http2 from 'http2'; import * as url from 'url'; -import { EventEmitter } from "events"; -import { Metadata } from "./metadata"; -import { Call, PartialCallStreamOptions, Http2CallStream } from "./call-stream"; -import { EmitterAugmentation1, EmitterAugmentation0 } from "./events"; -import { ChannelOptions } from './channel-options'; +import {Call, Http2CallStream} from './call-stream'; +import {ChannelOptions} from './channel-options'; +import {Metadata} from './metadata'; const { HTTP2_HEADER_AUTHORITY, HTTP2_HEADER_CONTENT_TYPE, HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, - HTTP2_HEADER_SCHEME, HTTP2_HEADER_TE, HTTP2_HEADER_USER_AGENT } = http2.constants; @@ -35,7 +33,7 @@ export interface SubChannel extends EventEmitter { export class Http2SubChannel extends EventEmitter implements SubChannel { private session: http2.ClientHttp2Session; - private refCount: number = 0; + private refCount = 0; private userAgent: string; private keepaliveTimeMs: number = KEEPALIVE_TIME_MS; @@ -43,8 +41,9 @@ export class Http2SubChannel extends EventEmitter implements SubChannel { private keepaliveIntervalId: NodeJS.Timer; private keepaliveTimeoutId: NodeJS.Timer; - constructor(target: url.URL, connectionOptions: http2.SecureClientSessionOptions, - userAgent: string, channelArgs: Partial) { + constructor( + target: url.URL, connectionOptions: http2.SecureClientSessionOptions, + userAgent: string, channelArgs: Partial) { super(); this.session = http2.connect(target, connectionOptions); this.session.on('connect', () => { @@ -57,7 +56,7 @@ export class Http2SubChannel extends EventEmitter implements SubChannel { this.session.on('error', () => { this.stopKeepalivePings(); this.emit('close'); - }) + }); this.userAgent = userAgent; if (channelArgs['grpc.keepalive_time_ms']) { @@ -92,7 +91,7 @@ export class Http2SubChannel extends EventEmitter implements SubChannel { this.keepaliveTimeoutId = setTimeout(() => { this.emit('close'); }, this.keepaliveTimeoutMs); - this.session.ping((err: Error | null, duration: number, payload: Buffer) => { + this.session.ping((err: Error|null, duration: number, payload: Buffer) => { clearTimeout(this.keepaliveTimeoutId); }); } @@ -120,15 +119,15 @@ export class Http2SubChannel extends EventEmitter implements SubChannel { headers[HTTP2_HEADER_METHOD] = 'POST'; headers[HTTP2_HEADER_PATH] = callStream.getMethod(); headers[HTTP2_HEADER_TE] = 'trailers'; - let http2Stream = this.session.request(headers); + const http2Stream = this.session.request(headers); this.ref(); http2Stream.on('close', () => { this.unref(); }); callStream.attachHttp2Stream(http2Stream); } - + close() { this.session.close(); } -} \ No newline at end of file +} diff --git a/packages/grpc-js-core/test/test-call-stream.ts b/packages/grpc-js-core/test/test-call-stream.ts index 902007fd..33a5991c 100644 --- a/packages/grpc-js-core/test/test-call-stream.ts +++ b/packages/grpc-js-core/test/test-call-stream.ts @@ -1,7 +1,6 @@ import * as assert from 'assert'; -import {EventEmitter} from 'events'; import * as http2 from 'http2'; -import {forOwn, range} from 'lodash'; +import {range} from 'lodash'; import * as stream from 'stream'; import {CallCredentials} from '../src/call-credentials'; @@ -79,18 +78,14 @@ class ClientHttp2StreamMock extends stream.Duplex implements } describe('CallStream', () => { - const callStreamArgs = { - deadline: Infinity, - flags: 0, - host: '', - parentCall: null - }; + const callStreamArgs = + {deadline: Infinity, flags: 0, host: '', parentCall: null}; /* A CompressionFilter is now necessary to frame and deframe messages. * Currently the channel is unused, so we can replace it with an empty object, * but this might break if we start checking channel arguments, in which case * we will need a more sophisticated fake */ const filterStackFactory = - new FilterStackFactory([new CompressionFilterFactory({})]); + new FilterStackFactory([new CompressionFilterFactory({} as Channel)]); const message = 'eat this message'; // 16 bytes beforeEach(() => { @@ -101,8 +96,8 @@ describe('CallStream', () => { (done) => { const responseMetadata = new Metadata(); responseMetadata.add('key', 'value'); - const callStream = - new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); + const callStream = new Http2CallStream( + 'foo', {} as Http2Channel, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock( {payload: Buffer.alloc(0), frameLengths: []}); @@ -139,8 +134,8 @@ describe('CallStream', () => { const maybeSkip = (fn: typeof it) => value ? fn : fn.skip; maybeSkip(it)(`for error code ${key}`, () => { return new Promise((resolve, reject) => { - const callStream = - new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); + const callStream = new Http2CallStream( + 'foo', {} as Http2Channel, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock( {payload: Buffer.alloc(0), frameLengths: []}); callStream.attachHttp2Stream(http2Stream); @@ -159,8 +154,8 @@ describe('CallStream', () => { }); it('should have functioning getters', (done) => { - const callStream = - new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); + const callStream = new Http2CallStream( + 'foo', {} as Http2Channel, callStreamArgs, filterStackFactory); assert.strictEqual(callStream.getDeadline(), callStreamArgs.deadline); assert.strictEqual(callStream.getStatus(), null); const credentials = CallCredentials.createEmpty(); @@ -178,8 +173,8 @@ describe('CallStream', () => { describe('attachHttp2Stream', () => { it('should handle an empty message', (done) => { - const callStream = - new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); + const callStream = new Http2CallStream( + 'foo', {} as Http2Channel, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock({payload: serialize(''), frameLengths: []}); callStream.once('data', assert2.mustCall((buffer) => { @@ -205,8 +200,8 @@ describe('CallStream', () => { }].forEach((testCase: {description: string, frameLengths: number[]}) => { it(`should handle a short message where ${testCase.description}`, (done) => { - const callStream = - new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); + const callStream = new Http2CallStream( + 'foo', {} as Http2Channel, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock({ payload: serialize(message), // 21 bytes frameLengths: testCase.frameLengths @@ -235,8 +230,8 @@ describe('CallStream', () => { frameLengths: range(0, 41).map(() => 1) }].forEach((testCase: {description: string, frameLengths: number[]}) => { it(`should handle two messages where ${testCase.description}`, (done) => { - const callStream = - new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); + const callStream = new Http2CallStream( + 'foo', {} as Http2Channel, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock({ payload: Buffer.concat( [serialize(message), serialize(message)]), // 42 bytes @@ -255,8 +250,8 @@ describe('CallStream', () => { }); it('should send buffered writes', (done) => { - const callStream = - new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); + const callStream = new Http2CallStream( + 'foo', {} as Http2Channel, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock( {payload: Buffer.alloc(0), frameLengths: []}); let streamFlushed = false; @@ -278,8 +273,8 @@ describe('CallStream', () => { it('should cause data chunks in write calls afterward to be written to the given stream', (done) => { - const callStream = - new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); + const callStream = new Http2CallStream( + 'foo', {} as Http2Channel, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock( {payload: Buffer.alloc(0), frameLengths: []}); http2Stream.once('write', assert2.mustCall((chunk: Buffer) => { @@ -296,8 +291,8 @@ describe('CallStream', () => { }); it('should handle underlying stream errors', () => { - const callStream = - new Http2CallStream('foo', {}, callStreamArgs, filterStackFactory); + const callStream = new Http2CallStream( + 'foo', {} as Http2Channel, callStreamArgs, filterStackFactory); const http2Stream = new ClientHttp2StreamMock( {payload: Buffer.alloc(0), frameLengths: []}); callStream.once('status', assert2.mustCall((status) => { diff --git a/packages/grpc-js-core/test/test-logging.ts b/packages/grpc-js-core/test/test-logging.ts index c1c0c25b..4f8f64a9 100644 --- a/packages/grpc-js-core/test/test-logging.ts +++ b/packages/grpc-js-core/test/test-logging.ts @@ -4,7 +4,7 @@ import * as grpc from '../src'; import * as logging from '../src/logging'; describe('Logging', () => { - afterEach(function() { + afterEach(() => { // Ensure that the logger is restored to its defaults after each test. grpc.setLogger(console); grpc.setLogVerbosity(grpc.logVerbosity.DEBUG); @@ -22,9 +22,9 @@ describe('Logging', () => { }); it('gates logging based on severity', () => { - const output: any[] = []; + const output: Array = []; const logger: Partial = { - error(...args: any[]): void { + error(...args: string[]): void { output.push(args); } }; @@ -48,13 +48,8 @@ describe('Logging', () => { logging.log(grpc.logVerbosity.INFO, 7, 8); logging.log(grpc.logVerbosity.ERROR, 'j', 'k'); - assert.deepStrictEqual(output, [ - ['a', 'b', 'c'], - ['d', 'e'], - ['f'], - ['g'], - ['h', 'i'], - ['j', 'k'] - ]); + assert.deepStrictEqual( + output, + [['a', 'b', 'c'], ['d', 'e'], ['f'], ['g'], ['h', 'i'], ['j', 'k']]); }); }); diff --git a/packages/grpc-js-core/test/test-status-builder.ts b/packages/grpc-js-core/test/test-status-builder.ts index 1aace004..ce27518d 100644 --- a/packages/grpc-js-core/test/test-status-builder.ts +++ b/packages/grpc-js-core/test/test-status-builder.ts @@ -16,19 +16,14 @@ describe('StatusBuilder', () => { assert.deepStrictEqual(builder.build(), {}); result = builder.withCode(grpc.status.OK); assert.strictEqual(result, builder); - assert.deepStrictEqual(builder.build(), { code: grpc.status.OK }); + assert.deepStrictEqual(builder.build(), {code: grpc.status.OK}); result = builder.withDetails('foobar'); assert.strictEqual(result, builder); - assert.deepStrictEqual(builder.build(), { - code: grpc.status.OK, - details: 'foobar' - }); + assert.deepStrictEqual( + builder.build(), {code: grpc.status.OK, details: 'foobar'}); result = builder.withMetadata(metadata); assert.strictEqual(result, builder); - assert.deepStrictEqual(builder.build(), { - code: grpc.status.OK, - details: 'foobar', - metadata - }); + assert.deepStrictEqual( + builder.build(), {code: grpc.status.OK, details: 'foobar', metadata}); }); });