diff --git a/packages/grpc-js-core/package.json b/packages/grpc-js-core/package.json index e0d48bd4..ffdc2b4a 100644 --- a/packages/grpc-js-core/package.json +++ b/packages/grpc-js-core/package.json @@ -16,7 +16,7 @@ "devDependencies": { "@types/lodash": "^4.14.77", "@types/mocha": "^2.2.43", - "@types/node": "^8.0.55", + "@types/node": "^9.4.6", "clang-format": "^1.0.55", "gts": "^0.5.1", "typescript": "~2.7.0" diff --git a/packages/grpc-js-core/src/call-stream.ts b/packages/grpc-js-core/src/call-stream.ts index 877978a8..849d685f 100644 --- a/packages/grpc-js-core/src/call-stream.ts +++ b/packages/grpc-js-core/src/call-stream.ts @@ -9,7 +9,7 @@ import {FilterStackFactory} from './filter-stack'; import {Metadata} from './metadata'; import {ObjectDuplex} from './object-stream'; -const {HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE} = http2.constants; +const {HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL} = http2.constants; export type Deadline = Date | number; @@ -156,7 +156,7 @@ export class Http2CallStream extends Duplex implements CallStream { attachHttp2Stream(stream: http2.ClientHttp2Stream): void { if (this.finalStatus !== null) { - stream.rstWithCancel(); + stream.close(NGHTTP2_CANCEL); } else { this.http2Stream = stream; stream.on('response', (headers, flags) => { @@ -328,7 +328,7 @@ export class Http2CallStream extends Duplex implements CallStream { if (this.http2Stream !== null && !this.http2Stream.destroyed) { /* TODO(murgatroid99): Determine if we want to send different RST_STREAM * codes based on the status code */ - this.http2Stream.rstWithCancel(); + this.http2Stream.close(NGHTTP2_CANCEL); } } diff --git a/packages/grpc-js-core/src/call.ts b/packages/grpc-js-core/src/call.ts index 5abc0479..af305b24 100644 --- a/packages/grpc-js-core/src/call.ts +++ b/packages/grpc-js-core/src/call.ts @@ -6,17 +6,16 @@ import {CallStream, StatusObject, WriteObject} from './call-stream'; import {Status} from './constants'; import {Metadata} from './metadata'; import {ObjectReadable, ObjectWritable} from './object-stream'; +import * as _ from 'lodash'; -export interface ServiceError extends Error { - code?: number; - metadata?: Metadata; -} - -export class ServiceErrorImpl extends Error implements ServiceError { - code?: number; - metadata?: Metadata; -} +/** + * A type extending the built-in Error object with additional fields. + */ +export type ServiceError = StatusObject & Error; +/** + * A base type for all user-facing values returned by client-side method calls. + */ export type Call = { cancel(): void; getPeer(): string; @@ -24,16 +23,28 @@ export type Call = { & EmitterAugmentation1<'status', StatusObject> & EventEmitter; +/** + * A type representing the return value of a unary method call. + */ export type ClientUnaryCall = Call; +/** + * A type representing the return value of a server stream method call. + */ export type ClientReadableStream = { deserialize: (chunk: Buffer) => ResponseType; } & Call & ObjectReadable; +/** + * A type representing the return value of a client stream method call. + */ export type ClientWritableStream = { serialize: (value: RequestType) => Buffer; } & Call & ObjectWritable; +/** + * A type representing the return value of a bidirectional stream method call. + */ export type ClientDuplexStream = ClientWritableStream & ClientReadableStream; @@ -78,9 +89,9 @@ function setUpReadableStream( call.on('status', (status: StatusObject) => { stream.emit('status', status); if (status.code !== Status.OK) { - const error = new ServiceErrorImpl(status.details); - error.code = status.code; - error.metadata = status.metadata; + const statusName = _.invert(Status)[status.code]; + const message: string = `${status.code} ${statusName}: ${status.details}`; + const error: ServiceError = Object.assign(new Error(status.details), status); stream.emit('error', error); } }); diff --git a/packages/grpc-js-core/src/channel.ts b/packages/grpc-js-core/src/channel.ts index 51ec1fa5..193ae9a5 100644 --- a/packages/grpc-js-core/src/channel.ts +++ b/packages/grpc-js-core/src/channel.ts @@ -14,6 +14,8 @@ import {FilterStackFactory} from './filter-stack'; import {Metadata, MetadataObject} from './metadata'; import { MetadataStatusFilterFactory } from './metadata-status-filter'; +const { version: clientVersion } = require('../../package'); + const IDLE_TIMEOUT_MS = 300000; const MIN_CONNECT_TIMEOUT_MS = 20000; @@ -28,13 +30,19 @@ const { HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, HTTP2_HEADER_SCHEME, - HTTP2_HEADER_TE + HTTP2_HEADER_TE, + HTTP2_HEADER_USER_AGENT } = http2.constants; /** * An interface that contains options used when initializing a Channel instance. */ -export interface ChannelOptions { [index: string]: string|number; } +export interface ChannelOptions { + 'grpc.ssl_target_name_override': string; + 'grpc.primary_user_agent': string; + 'grpc.secondary_user_agent': string; + [key: string]: string | number; +} export enum ConnectivityState { CONNECTING, @@ -72,6 +80,7 @@ export interface Channel extends EventEmitter { } export class Http2Channel extends EventEmitter implements Channel { + private readonly userAgent: string; private readonly authority: url.URL; private connectivityState: ConnectivityState = ConnectivityState.IDLE; /* For now, we have up to one subchannel, which will exist as long as we are @@ -112,7 +121,7 @@ export class Http2Channel extends EventEmitter implements Channel { case ConnectivityState.IDLE: case ConnectivityState.SHUTDOWN: if (this.subChannel) { - this.subChannel.shutdown({graceful: true}); + this.subChannel.close(); this.subChannel.removeListener('connect', this.subChannelConnectCallback); this.subChannel.removeListener('close', this.subChannelCloseCallback); this.subChannel = null; @@ -145,7 +154,7 @@ export class Http2Channel extends EventEmitter implements Channel { // to override the target hostname when checking server identity. // This option is used for testing only. if (this.options['grpc.ssl_target_name_override']) { - const sslTargetNameOverride = this.options['grpc.ssl_target_name_override'] as string; + const sslTargetNameOverride = this.options['grpc.ssl_target_name_override']!; connectionOptions.checkServerIdentity = (host: string, cert: PeerCertificate): Error | undefined => { return checkServerIdentity(sslTargetNameOverride, cert); } @@ -159,7 +168,7 @@ export class Http2Channel extends EventEmitter implements Channel { MIN_CONNECT_TIMEOUT_MS); let connectionTimerId: NodeJS.Timer = setTimeout(() => { // This should trigger the 'close' event, which will send us back to TRANSIENT_FAILURE - subChannel.shutdown(); + subChannel.close(); }, connectionTimeout); this.subChannelConnectCallback = () => { // Connection succeeded @@ -181,7 +190,7 @@ export class Http2Channel extends EventEmitter implements Channel { constructor( address: string, public readonly credentials: ChannelCredentials, - private readonly options: ChannelOptions) { + private readonly options: Partial) { super(); if (credentials.getSecureContext() === null) { this.authority = new url.URL(`http://${address}`); @@ -199,16 +208,24 @@ export class Http2Channel extends EventEmitter implements Channel { * a value of type NodeJS.Timer. */ this.backoffTimerId = setTimeout(() => {}, 0); clearTimeout(this.backoffTimerId); + + // Build user-agent string. + this.userAgent = [ + options['grpc.primary_user_agent'], + `grpc-node-js/${clientVersion}`, + options['grpc.secondary_user_agent'] + ].filter(e => e).join(' '); // remove falsey values first } private startHttp2Stream( methodName: string, stream: Http2CallStream, metadata: Metadata) { let finalMetadata: Promise = - stream.filterStack.sendMetadata(Promise.resolve(metadata)); + stream.filterStack.sendMetadata(Promise.resolve(metadata.clone())); Promise.all([finalMetadata, this.connect()]) .then(([metadataValue]) => { let headers = metadataValue.toHttp2Headers(); headers[HTTP2_HEADER_AUTHORITY] = this.authority.hostname; + headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; headers[HTTP2_HEADER_METHOD] = 'POST'; headers[HTTP2_HEADER_PATH] = methodName; @@ -217,9 +234,7 @@ export class Http2Channel extends EventEmitter implements Channel { if (this.connectivityState === ConnectivityState.READY) { const session: http2.ClientHttp2Session = this.subChannel!; // Prevent the HTTP/2 session from keeping the process alive. - // TODO(kjin): Monitor nodejs/node#17620, which adds unref - // directly to the Http2Session object. - session.socket.unref(); + session.unref(); stream.attachHttp2Stream(session.request(headers)); } else { /* In this case, we lost the connection while finalizing diff --git a/packages/grpc-js-core/src/client.ts b/packages/grpc-js-core/src/client.ts index 862d212c..5f9c670b 100644 --- a/packages/grpc-js-core/src/client.ts +++ b/packages/grpc-js-core/src/client.ts @@ -1,7 +1,7 @@ import {once} from 'lodash'; import {URL} from 'url'; -import {ClientDuplexStream, ClientDuplexStreamImpl, ClientReadableStream, ClientReadableStreamImpl, ClientUnaryCall, ClientUnaryCallImpl, ClientWritableStream, ClientWritableStreamImpl, ServiceError, ServiceErrorImpl} from './call'; +import {ClientDuplexStream, ClientDuplexStreamImpl, ClientReadableStream, ClientReadableStreamImpl, ClientUnaryCall, ClientUnaryCallImpl, ClientWritableStream, ClientWritableStreamImpl, ServiceError} from './call'; import {CallOptions, CallStream, StatusObject, WriteObject} from './call-stream'; import {Channel, ChannelOptions, Http2Channel} from './channel'; import {ChannelCredentials} from './channel-credentials'; @@ -16,14 +16,7 @@ export class Client { private readonly channel: Channel; constructor( address: string, credentials: ChannelCredentials, - options: ChannelOptions = {}) { - if (options['grpc.primary_user_agent']) { - options['grpc.primary_user_agent'] += ' '; - } else { - options['grpc.primary_user_agent'] = ''; - } - // TODO(murgatroid99): Figure out how to get version number - // options['grpc.primary_user_agent'] += 'grpc-node/' + version; + options: Partial = {}) { this.channel = new Http2Channel(address, credentials, options); } @@ -35,7 +28,11 @@ export class Client { void { let cb: (error: Error|null) => void = once(callback); let callbackCalled = false; + let timer: NodeJS.Timer | null = null; this.channel.connect().then(() => { + if (timer) { + clearTimeout(timer); + } cb(null); }); if (deadline !== Infinity) { @@ -49,7 +46,7 @@ export class Client { if (timeout < 0) { timeout = 0; } - setTimeout(() => { + timer = setTimeout(() => { cb(new Error('Failed to connect before the deadline')); }, timeout); } @@ -83,9 +80,7 @@ export class Client { if (status.code === Status.OK) { callback(null, responseMessage as ResponseType); } else { - const error = new ServiceErrorImpl(status.details); - error.code = status.code; - error.metadata = status.metadata; + const error: ServiceError = Object.assign(new Error(status.details), status); callback(error); } }); diff --git a/packages/grpc-js-core/src/deadline-filter.ts b/packages/grpc-js-core/src/deadline-filter.ts index c3ed045b..428ed8a9 100644 --- a/packages/grpc-js-core/src/deadline-filter.ts +++ b/packages/grpc-js-core/src/deadline-filter.ts @@ -20,6 +20,7 @@ function getDeadline(deadline: number) { } export class DeadlineFilter extends BaseFilter implements Filter { + private timer: NodeJS.Timer | null = null; private deadline: number; constructor( private readonly channel: Http2Channel, @@ -37,10 +38,11 @@ export class DeadlineFilter extends BaseFilter implements Filter { timeout = 0; } if (this.deadline !== Infinity) { - setTimeout(() => { + this.timer = setTimeout(() => { callStream.cancelWithStatus( Status.DEADLINE_EXCEEDED, 'Deadline exceeded'); }, timeout); + callStream.on('status', () => clearTimeout(this.timer as NodeJS.Timer)); } } diff --git a/packages/grpc-js-core/src/metadata.ts b/packages/grpc-js-core/src/metadata.ts index dbdb4e91..84e9160e 100644 --- a/packages/grpc-js-core/src/metadata.ts +++ b/packages/grpc-js-core/src/metadata.ts @@ -181,6 +181,11 @@ export class Metadata { }); return result; } + + // For compatibility with the other Metadata implementation + private _getCoreRepresentation() { + return this.internalRepr; + } /** * Returns a new Metadata object based fields in a given IncomingHttpHeaders @@ -196,7 +201,8 @@ export class Metadata { result.add(key, Buffer.from(value, 'base64')); }); } else if (values !== undefined) { - result.add(key, Buffer.from(values, 'base64')); + values.split(',').map(v => v.trim()).forEach(v => + result.add(key, Buffer.from(v, 'base64'))); } } else { if (Array.isArray(values)) { @@ -204,7 +210,8 @@ export class Metadata { result.add(key, value); }); } else if (values !== undefined) { - result.add(key, values); + values.split(',').map(v => v.trim()).forEach(v => + result.add(key, v)); } } }); diff --git a/packages/grpc-js-core/test/test-call-stream.ts b/packages/grpc-js-core/test/test-call-stream.ts index 91d694dc..0e142e40 100644 --- a/packages/grpc-js-core/test/test-call-stream.ts +++ b/packages/grpc-js-core/test/test-call-stream.ts @@ -39,10 +39,13 @@ class ClientHttp2StreamMock extends stream.Duplex implements http2.ClientHttp2St bytesRead = 0; dataFrame = 0; aborted: boolean = false; + closed: boolean = false; destroyed: boolean = false; + pending: boolean = false; rstCode: number = 0; session: http2.Http2Session = {} as any; state: http2.StreamState = {} as any; + close = mockFunction; priority = mockFunction; rstStream = mockFunction; rstWithNoError = mockFunction;