grpc-js-core: handle eos headers as trailers

This commit is contained in:
Kelvin Jin 2017-12-18 18:20:19 -08:00
parent da73a0fff0
commit aea5bf82e5
4 changed files with 125 additions and 54 deletions

View File

@ -105,6 +105,13 @@ export class Http2CallStream extends Duplex implements CallStream {
// Status code mapped from :status. To be used if grpc-status is not received
private mappedStatusCode: Status = Status.UNKNOWN;
// Promise objects that are re-assigned to resolving promises when headers
// or trailers received. Processing headers/trailers is asynchronous, so we
// can use these objects to await their completion. This helps us establish
// order of precedence when obtaining the status of the call.
private handlingHeaders = Promise.resolve();
private handlingTrailers = Promise.resolve();
// This is populated (non-null) if and only if the call has ended
private finalStatus: StatusObject|null = null;
@ -116,6 +123,11 @@ export class Http2CallStream extends Duplex implements CallStream {
this.filterStack = filterStackFactory.createFilter(this);
}
/**
* On first call, emits a 'status' event with the given StatusObject.
* Subsequent calls are no-ops.
* @param status The status of the call.
*/
private endCall(status: StatusObject): void {
if (this.finalStatus === null) {
this.finalStatus = status;
@ -135,12 +147,46 @@ export class Http2CallStream extends Duplex implements CallStream {
return canPush;
}
private handleTrailers(headers: http2.IncomingHttpHeaders) {
let code: Status = this.mappedStatusCode;
let details = '';
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (e) {
metadata = new Metadata();
}
let status: StatusObject = {code, details, metadata};
this.handlingTrailers = (async () => {
let finalStatus;
try {
// Attempt to assign final status.
finalStatus = await this.filterStack.receiveTrailers(Promise.resolve(status));
} catch (error) {
await this.handlingHeaders;
// This is a no-op if the call was already ended when handling headers.
this.endCall({
code: Status.INTERNAL,
details: 'Failed to process received status',
metadata: new Metadata()
});
return;
}
// It's possible that headers were received but not fully handled yet.
// Give the headers handler an opportunity to end the call first,
// if an error occurred.
await this.handlingHeaders;
// This is a no-op if the call was already ended when handling headers.
this.endCall(finalStatus);
})();
}
attachHttp2Stream(stream: http2.ClientHttp2Stream): void {
if (this.finalStatus !== null) {
stream.rstWithCancel();
} else {
this.http2Stream = stream;
stream.on('response', (headers) => {
stream.on('response', (headers, flags) => {
switch (headers[HTTP2_HEADER_STATUS]) {
// TODO(murgatroid99): handle 100 and 101
case '400':
@ -166,57 +212,27 @@ export class Http2CallStream extends Duplex implements CallStream {
}
delete headers[HTTP2_HEADER_STATUS];
delete headers[HTTP2_HEADER_CONTENT_TYPE];
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (e) {
this.cancelWithStatus(Status.UNKNOWN, e.message);
return;
}
this.filterStack.receiveMetadata(Promise.resolve(metadata))
.then(
(finalMetadata) => {
this.emit('metadata', finalMetadata);
},
(error) => {
this.cancelWithStatus(Status.UNKNOWN, error.message);
});
});
stream.on('trailers', (headers: http2.IncomingHttpHeaders) => {
let code: Status = this.mappedStatusCode;
let details = '';
if (typeof headers['grpc-status'] === 'string') {
let receivedCode = Number(headers['grpc-status']);
if (receivedCode in Status) {
code = receivedCode;
} else {
code = Status.UNKNOWN;
if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
this.handleTrailers(headers);
} else {
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (error) {
this.endCall({code: Status.UNKNOWN, details: error.message, metadata: new Metadata()});
return;
}
delete headers['grpc-status'];
this.handlingHeaders =
this.filterStack.receiveMetadata(Promise.resolve(metadata))
.then((finalMetadata) => {
this.emit('metadata', finalMetadata);
}).catch((error) => {
this.destroyHttp2Stream();
this.endCall({code: Status.UNKNOWN, details: error.message, metadata: new Metadata()});
});
}
if (typeof headers['grpc-message'] === 'string') {
details = decodeURI(headers['grpc-message'] as string);
}
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (e) {
metadata = new Metadata();
}
let status: StatusObject = {code, details, metadata};
this.filterStack.receiveTrailers(Promise.resolve(status))
.then(
(finalStatus) => {
this.endCall(finalStatus);
},
(error) => {
this.endCall({
code: Status.INTERNAL,
details: 'Failed to process received status',
metadata: new Metadata()
});
});
});
stream.on('trailers', this.handleTrailers.bind(this));
stream.on('data', (data) => {
let readHead = 0;
let canPush = true;
@ -278,7 +294,7 @@ export class Http2CallStream extends Duplex implements CallStream {
this.unpushedReadMessages.push(null);
}
});
stream.on('streamClosed', (errorCode) => {
stream.on('streamClosed', async (errorCode) => {
let code: Status;
let details = '';
switch (errorCode) {
@ -299,6 +315,13 @@ export class Http2CallStream extends Duplex implements CallStream {
default:
code = Status.INTERNAL;
}
// This guarantees that if trailers were received, the value of the
// 'grpc-status' header takes precedence for emitted status data.
await this.handlingTrailers;
// This is a no-op if trailers were received at all.
// This is OK, because status codes emitted here correspond to more
// catastrophic issues that prevent us from receiving trailers in the
// first place.
this.endCall({code: code, details: details, metadata: new Metadata()});
});
stream.on('error', (err: Error) => {
@ -323,8 +346,7 @@ export class Http2CallStream extends Duplex implements CallStream {
}
}
cancelWithStatus(status: Status, details: string): void {
this.endCall({code: status, details: details, metadata: new Metadata()});
private destroyHttp2Stream() {
// The http2 stream could already have been destroyed if cancelWithStatus
// is called in response to an internal http2 error.
if (this.http2Stream !== null && !this.http2Stream.destroyed) {
@ -334,6 +356,16 @@ export class Http2CallStream extends Duplex implements CallStream {
}
}
cancelWithStatus(status: Status, details: string): void {
this.destroyHttp2Stream();
(async () => {
// If trailers are currently being processed, the call should be ended
// by handleTrailers instead.
await this.handlingTrailers;
this.endCall({code: status, details: details, metadata: new Metadata()});
})();
}
getDeadline(): Deadline {
return this.options.deadline;
}

View File

@ -12,6 +12,7 @@ import {Status} from './constants';
import {DeadlineFilterFactory} from './deadline-filter';
import {FilterStackFactory} from './filter-stack';
import {Metadata, MetadataObject} from './metadata';
import { MetadataStatusFilterFactory } from './metadata-status-filter';
const IDLE_TIMEOUT_MS = 300000;
@ -177,7 +178,9 @@ export class Http2Channel extends EventEmitter implements Channel {
}
this.filterStackFactory = new FilterStackFactory([
new CompressionFilterFactory(this),
new CallCredentialsFilterFactory(this), new DeadlineFilterFactory(this)
new CallCredentialsFilterFactory(this),
new DeadlineFilterFactory(this),
new MetadataStatusFilterFactory(this)
]);
this.currentBackoffDeadline = new Date();
/* The only purpose of these lines is to ensure that this.backoffTimerId has

View File

@ -0,0 +1,36 @@
import {CallStream} from './call-stream';
import {Channel} from './channel';
import {BaseFilter, Filter, FilterFactory} from './filter';
import {StatusObject} from './call-stream';
import {Status} from './constants';
export class MetadataStatusFilter extends BaseFilter implements Filter {
async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {
let { code, details, metadata } = await status;
if (code !== Status.UNKNOWN) {
// we already have a known status, so don't assign a new one.
return { code, details, metadata };
}
const metadataMap = metadata.getMap();
if (typeof metadataMap['grpc-status'] === 'string') {
let receivedCode = Number(metadataMap['grpc-status']);
if (receivedCode in Status) {
code = receivedCode;
}
metadata.remove('grpc-status');
}
if (typeof metadataMap['grpc-message'] === 'string') {
details = decodeURI(metadataMap['grpc-message'] as string);
metadata.remove('grpc-message');
}
return { code, details, metadata };
}
}
export class MetadataStatusFilterFactory implements
FilterFactory<MetadataStatusFilter> {
constructor(private readonly channel: Channel) {}
createFilter(callStream: CallStream): MetadataStatusFilter {
return new MetadataStatusFilter();
}
}

View File

@ -26,7 +26,7 @@ function isLegalKey(key: string): boolean {
}
function isLegalNonBinaryValue(value: string): boolean {
return !!value.match(/^[ -~]+$/);
return !!value.match(/^[ -~]*$/);
}
function isBinaryKey(key: string): boolean {