Merge pull request #147 from kjin/grpc-js-core-work-6

grpc-js-core: fixes and update to use node 9.4 http2 api
This commit is contained in:
Kelvin Jin 2018-02-15 14:32:41 -05:00 committed by GitHub
commit 08b98238cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 75 additions and 42 deletions

View File

@ -16,7 +16,7 @@
"devDependencies": {
"@types/lodash": "^4.14.77",
"@types/mocha": "^2.2.43",
"@types/node": "^8.0.55",
"@types/node": "^9.4.6",
"clang-format": "^1.0.55",
"gts": "^0.5.1",
"typescript": "~2.7.0"

View File

@ -9,7 +9,7 @@ import {FilterStackFactory} from './filter-stack';
import {Metadata} from './metadata';
import {ObjectDuplex} from './object-stream';
const {HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE} = http2.constants;
const {HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL} = http2.constants;
export type Deadline = Date | number;
@ -156,7 +156,7 @@ export class Http2CallStream extends Duplex implements CallStream {
attachHttp2Stream(stream: http2.ClientHttp2Stream): void {
if (this.finalStatus !== null) {
stream.rstWithCancel();
stream.close(NGHTTP2_CANCEL);
} else {
this.http2Stream = stream;
stream.on('response', (headers, flags) => {
@ -328,7 +328,7 @@ export class Http2CallStream extends Duplex implements CallStream {
if (this.http2Stream !== null && !this.http2Stream.destroyed) {
/* TODO(murgatroid99): Determine if we want to send different RST_STREAM
* codes based on the status code */
this.http2Stream.rstWithCancel();
this.http2Stream.close(NGHTTP2_CANCEL);
}
}

View File

@ -6,17 +6,16 @@ import {CallStream, StatusObject, WriteObject} from './call-stream';
import {Status} from './constants';
import {Metadata} from './metadata';
import {ObjectReadable, ObjectWritable} from './object-stream';
import * as _ from 'lodash';
export interface ServiceError extends Error {
code?: number;
metadata?: Metadata;
}
export class ServiceErrorImpl extends Error implements ServiceError {
code?: number;
metadata?: Metadata;
}
/**
* A type extending the built-in Error object with additional fields.
*/
export type ServiceError = StatusObject & Error;
/**
* A base type for all user-facing values returned by client-side method calls.
*/
export type Call = {
cancel(): void;
getPeer(): string;
@ -24,16 +23,28 @@ export type Call = {
& EmitterAugmentation1<'status', StatusObject>
& EventEmitter;
/**
* A type representing the return value of a unary method call.
*/
export type ClientUnaryCall = Call;
/**
* A type representing the return value of a server stream method call.
*/
export type ClientReadableStream<ResponseType> = {
deserialize: (chunk: Buffer) => ResponseType;
} & Call & ObjectReadable<ResponseType>;
/**
* A type representing the return value of a client stream method call.
*/
export type ClientWritableStream<RequestType> = {
serialize: (value: RequestType) => Buffer;
} & Call & ObjectWritable<RequestType>;
/**
* A type representing the return value of a bidirectional stream method call.
*/
export type ClientDuplexStream<RequestType, ResponseType> =
ClientWritableStream<RequestType> & ClientReadableStream<ResponseType>;
@ -78,9 +89,9 @@ function setUpReadableStream<ResponseType>(
call.on('status', (status: StatusObject) => {
stream.emit('status', status);
if (status.code !== Status.OK) {
const error = new ServiceErrorImpl(status.details);
error.code = status.code;
error.metadata = status.metadata;
const statusName = _.invert(Status)[status.code];
const message: string = `${status.code} ${statusName}: ${status.details}`;
const error: ServiceError = Object.assign(new Error(status.details), status);
stream.emit('error', error);
}
});

View File

@ -14,6 +14,8 @@ import {FilterStackFactory} from './filter-stack';
import {Metadata, MetadataObject} from './metadata';
import { MetadataStatusFilterFactory } from './metadata-status-filter';
const { version: clientVersion } = require('../../package');
const IDLE_TIMEOUT_MS = 300000;
const MIN_CONNECT_TIMEOUT_MS = 20000;
@ -28,13 +30,19 @@ const {
HTTP2_HEADER_METHOD,
HTTP2_HEADER_PATH,
HTTP2_HEADER_SCHEME,
HTTP2_HEADER_TE
HTTP2_HEADER_TE,
HTTP2_HEADER_USER_AGENT
} = http2.constants;
/**
* An interface that contains options used when initializing a Channel instance.
*/
export interface ChannelOptions { [index: string]: string|number; }
export interface ChannelOptions {
'grpc.ssl_target_name_override': string;
'grpc.primary_user_agent': string;
'grpc.secondary_user_agent': string;
[key: string]: string | number;
}
export enum ConnectivityState {
CONNECTING,
@ -72,6 +80,7 @@ export interface Channel extends EventEmitter {
}
export class Http2Channel extends EventEmitter implements Channel {
private readonly userAgent: string;
private readonly authority: url.URL;
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
/* For now, we have up to one subchannel, which will exist as long as we are
@ -112,7 +121,7 @@ export class Http2Channel extends EventEmitter implements Channel {
case ConnectivityState.IDLE:
case ConnectivityState.SHUTDOWN:
if (this.subChannel) {
this.subChannel.shutdown({graceful: true});
this.subChannel.close();
this.subChannel.removeListener('connect', this.subChannelConnectCallback);
this.subChannel.removeListener('close', this.subChannelCloseCallback);
this.subChannel = null;
@ -145,7 +154,7 @@ export class Http2Channel extends EventEmitter implements Channel {
// to override the target hostname when checking server identity.
// This option is used for testing only.
if (this.options['grpc.ssl_target_name_override']) {
const sslTargetNameOverride = this.options['grpc.ssl_target_name_override'] as string;
const sslTargetNameOverride = this.options['grpc.ssl_target_name_override']!;
connectionOptions.checkServerIdentity = (host: string, cert: PeerCertificate): Error | undefined => {
return checkServerIdentity(sslTargetNameOverride, cert);
}
@ -159,7 +168,7 @@ export class Http2Channel extends EventEmitter implements Channel {
MIN_CONNECT_TIMEOUT_MS);
let connectionTimerId: NodeJS.Timer = setTimeout(() => {
// This should trigger the 'close' event, which will send us back to TRANSIENT_FAILURE
subChannel.shutdown();
subChannel.close();
}, connectionTimeout);
this.subChannelConnectCallback = () => {
// Connection succeeded
@ -181,7 +190,7 @@ export class Http2Channel extends EventEmitter implements Channel {
constructor(
address: string,
public readonly credentials: ChannelCredentials,
private readonly options: ChannelOptions) {
private readonly options: Partial<ChannelOptions>) {
super();
if (credentials.getSecureContext() === null) {
this.authority = new url.URL(`http://${address}`);
@ -199,16 +208,24 @@ export class Http2Channel extends EventEmitter implements Channel {
* a value of type NodeJS.Timer. */
this.backoffTimerId = setTimeout(() => {}, 0);
clearTimeout(this.backoffTimerId);
// Build user-agent string.
this.userAgent = [
options['grpc.primary_user_agent'],
`grpc-node-js/${clientVersion}`,
options['grpc.secondary_user_agent']
].filter(e => e).join(' '); // remove falsey values first
}
private startHttp2Stream(
methodName: string, stream: Http2CallStream, metadata: Metadata) {
let finalMetadata: Promise<Metadata> =
stream.filterStack.sendMetadata(Promise.resolve(metadata));
stream.filterStack.sendMetadata(Promise.resolve(metadata.clone()));
Promise.all([finalMetadata, this.connect()])
.then(([metadataValue]) => {
let headers = metadataValue.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = this.authority.hostname;
headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
headers[HTTP2_HEADER_METHOD] = 'POST';
headers[HTTP2_HEADER_PATH] = methodName;
@ -217,9 +234,7 @@ export class Http2Channel extends EventEmitter implements Channel {
if (this.connectivityState === ConnectivityState.READY) {
const session: http2.ClientHttp2Session = this.subChannel!;
// Prevent the HTTP/2 session from keeping the process alive.
// TODO(kjin): Monitor nodejs/node#17620, which adds unref
// directly to the Http2Session object.
session.socket.unref();
session.unref();
stream.attachHttp2Stream(session.request(headers));
} else {
/* In this case, we lost the connection while finalizing

View File

@ -1,7 +1,7 @@
import {once} from 'lodash';
import {URL} from 'url';
import {ClientDuplexStream, ClientDuplexStreamImpl, ClientReadableStream, ClientReadableStreamImpl, ClientUnaryCall, ClientUnaryCallImpl, ClientWritableStream, ClientWritableStreamImpl, ServiceError, ServiceErrorImpl} from './call';
import {ClientDuplexStream, ClientDuplexStreamImpl, ClientReadableStream, ClientReadableStreamImpl, ClientUnaryCall, ClientUnaryCallImpl, ClientWritableStream, ClientWritableStreamImpl, ServiceError} from './call';
import {CallOptions, CallStream, StatusObject, WriteObject} from './call-stream';
import {Channel, ChannelOptions, Http2Channel} from './channel';
import {ChannelCredentials} from './channel-credentials';
@ -16,14 +16,7 @@ export class Client {
private readonly channel: Channel;
constructor(
address: string, credentials: ChannelCredentials,
options: ChannelOptions = {}) {
if (options['grpc.primary_user_agent']) {
options['grpc.primary_user_agent'] += ' ';
} else {
options['grpc.primary_user_agent'] = '';
}
// TODO(murgatroid99): Figure out how to get version number
// options['grpc.primary_user_agent'] += 'grpc-node/' + version;
options: Partial<ChannelOptions> = {}) {
this.channel = new Http2Channel(address, credentials, options);
}
@ -35,7 +28,11 @@ export class Client {
void {
let cb: (error: Error|null) => void = once(callback);
let callbackCalled = false;
let timer: NodeJS.Timer | null = null;
this.channel.connect().then(() => {
if (timer) {
clearTimeout(timer);
}
cb(null);
});
if (deadline !== Infinity) {
@ -49,7 +46,7 @@ export class Client {
if (timeout < 0) {
timeout = 0;
}
setTimeout(() => {
timer = setTimeout(() => {
cb(new Error('Failed to connect before the deadline'));
}, timeout);
}
@ -83,9 +80,7 @@ export class Client {
if (status.code === Status.OK) {
callback(null, responseMessage as ResponseType);
} else {
const error = new ServiceErrorImpl(status.details);
error.code = status.code;
error.metadata = status.metadata;
const error: ServiceError = Object.assign(new Error(status.details), status);
callback(error);
}
});

View File

@ -20,6 +20,7 @@ function getDeadline(deadline: number) {
}
export class DeadlineFilter extends BaseFilter implements Filter {
private timer: NodeJS.Timer | null = null;
private deadline: number;
constructor(
private readonly channel: Http2Channel,
@ -37,10 +38,11 @@ export class DeadlineFilter extends BaseFilter implements Filter {
timeout = 0;
}
if (this.deadline !== Infinity) {
setTimeout(() => {
this.timer = setTimeout(() => {
callStream.cancelWithStatus(
Status.DEADLINE_EXCEEDED, 'Deadline exceeded');
}, timeout);
callStream.on('status', () => clearTimeout(this.timer as NodeJS.Timer));
}
}

View File

@ -181,6 +181,11 @@ export class Metadata {
});
return result;
}
// For compatibility with the other Metadata implementation
private _getCoreRepresentation() {
return this.internalRepr;
}
/**
* Returns a new Metadata object based fields in a given IncomingHttpHeaders
@ -196,7 +201,8 @@ export class Metadata {
result.add(key, Buffer.from(value, 'base64'));
});
} else if (values !== undefined) {
result.add(key, Buffer.from(values, 'base64'));
values.split(',').map(v => v.trim()).forEach(v =>
result.add(key, Buffer.from(v, 'base64')));
}
} else {
if (Array.isArray(values)) {
@ -204,7 +210,8 @@ export class Metadata {
result.add(key, value);
});
} else if (values !== undefined) {
result.add(key, values);
values.split(',').map(v => v.trim()).forEach(v =>
result.add(key, v));
}
}
});

View File

@ -39,10 +39,13 @@ class ClientHttp2StreamMock extends stream.Duplex implements http2.ClientHttp2St
bytesRead = 0;
dataFrame = 0;
aborted: boolean = false;
closed: boolean = false;
destroyed: boolean = false;
pending: boolean = false;
rstCode: number = 0;
session: http2.Http2Session = {} as any;
state: http2.StreamState = {} as any;
close = mockFunction;
priority = mockFunction;
rstStream = mockFunction;
rstWithNoError = mockFunction;