From caa07ef8833c2dd5767b544d8b39532c8d255a12 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 21 Oct 2019 18:04:17 -0700 Subject: [PATCH] Make some filter types synchronous --- packages/grpc-js/src/call-stream.ts | 72 +++++++------------ packages/grpc-js/src/compression-filter.ts | 11 ++- packages/grpc-js/src/filter-stack.ts | 8 +-- packages/grpc-js/src/filter.ts | 10 +-- .../grpc-js/src/metadata-status-filter.ts | 4 +- 5 files changed, 40 insertions(+), 65 deletions(-) diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index a53dbefa..645e9865 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -103,13 +103,6 @@ export class Http2CallStream extends Duplex implements Call { // Status code mapped from :status. To be used if grpc-status is not received private mappedStatusCode: Status = Status.UNKNOWN; - // Promise objects that are re-assigned to resolving promises when headers - // or trailers received. Processing headers/trailers is asynchronous, so we - // can use these objects to await their completion. This helps us establish - // order of precedence when obtaining the status of the call. - private handlingHeaders = Promise.resolve(); - private handlingTrailers = Promise.resolve(); - // This is populated (non-null) if and only if the call has ended private finalStatus: StatusObject | null = null; @@ -224,30 +217,21 @@ export class Http2CallStream extends Duplex implements Call { metadata = new Metadata(); } const status: StatusObject = { code, details, metadata }; - this.handlingTrailers = (async () => { - let finalStatus; - try { - // Attempt to assign final status. - finalStatus = await this.filterStack.receiveTrailers( - Promise.resolve(status) - ); - } catch (error) { - await this.handlingHeaders; - // This is a no-op if the call was already ended when handling headers. - this.endCall({ - code: Status.INTERNAL, - details: 'Failed to process received status', - metadata: new Metadata(), - }); - return; - } - // It's possible that headers were received but not fully handled yet. - // Give the headers handler an opportunity to end the call first, - // if an error occurred. - await this.handlingHeaders; + let finalStatus; + try { + // Attempt to assign final status. + finalStatus = this.filterStack.receiveTrailers(status); + } catch (error) { // This is a no-op if the call was already ended when handling headers. - this.endCall(finalStatus); - })(); + this.endCall({ + code: Status.INTERNAL, + details: 'Failed to process received status', + metadata: new Metadata(), + }); + return; + } + // This is a no-op if the call was already ended when handling headers. + this.endCall(finalStatus); } attachHttp2Stream(stream: http2.ClientHttp2Stream, subchannel: Subchannel): void { @@ -297,19 +281,17 @@ export class Http2CallStream extends Duplex implements Call { }); return; } - this.handlingHeaders = this.filterStack - .receiveMetadata(Promise.resolve(metadata)) - .then(finalMetadata => { - this.emit('metadata', finalMetadata); - }) - .catch(error => { - this.destroyHttp2Stream(); - this.endCall({ - code: Status.UNKNOWN, - details: error.message, - metadata: new Metadata(), - }); + try { + const finalMetadata = this.filterStack.receiveMetadata(metadata); + this.emit('metadata', finalMetadata); + } catch (error) { + this.destroyHttp2Stream(); + this.endCall({ + code: Status.UNKNOWN, + details: error.message, + metadata: new Metadata(), }); + } } }); stream.on('trailers', this.handleTrailers.bind(this)); @@ -346,9 +328,6 @@ export class Http2CallStream extends Duplex implements Call { default: code = Status.INTERNAL; } - // This guarantees that if trailers were received, the value of the - // 'grpc-status' header takes precedence for emitted status data. - await this.handlingTrailers; // This is a no-op if trailers were received at all. // This is OK, because status codes emitted here correspond to more // catastrophic issues that prevent us from receiving trailers in the @@ -392,9 +371,6 @@ export class Http2CallStream extends Duplex implements Call { cancelWithStatus(status: Status, details: string): void { this.destroyHttp2Stream(); (async () => { - // If trailers are currently being processed, the call should be ended - // by handleTrailers instead. - await this.handlingTrailers; this.endCall({ code: status, details, metadata: new Metadata() }); })(); } diff --git a/packages/grpc-js/src/compression-filter.ts b/packages/grpc-js/src/compression-filter.ts index 34b7574f..b03a7d06 100644 --- a/packages/grpc-js/src/compression-filter.ts +++ b/packages/grpc-js/src/compression-filter.ts @@ -176,18 +176,17 @@ export class CompressionFilter extends BaseFilter implements Filter { return headers; } - async receiveMetadata(metadata: Promise): Promise { - const headers: Metadata = await metadata; - const receiveEncoding: MetadataValue[] = headers.get('grpc-encoding'); + receiveMetadata(metadata: Metadata): Metadata { + const receiveEncoding: MetadataValue[] = metadata.get('grpc-encoding'); if (receiveEncoding.length > 0) { const encoding: MetadataValue = receiveEncoding[0]; if (typeof encoding === 'string') { this.receiveCompression = getCompressionHandler(encoding); } } - headers.remove('grpc-encoding'); - headers.remove('grpc-accept-encoding'); - return headers; + metadata.remove('grpc-encoding'); + metadata.remove('grpc-accept-encoding'); + return metadata; } async sendMessage(message: Promise): Promise { diff --git a/packages/grpc-js/src/filter-stack.ts b/packages/grpc-js/src/filter-stack.ts index a15dcf43..3b6f16ba 100644 --- a/packages/grpc-js/src/filter-stack.ts +++ b/packages/grpc-js/src/filter-stack.ts @@ -32,8 +32,8 @@ export class FilterStack implements Filter { return result; } - receiveMetadata(metadata: Promise) { - let result: Promise = metadata; + receiveMetadata(metadata: Metadata) { + let result: Metadata = metadata; for (let i = this.filters.length - 1; i >= 0; i--) { result = this.filters[i].receiveMetadata(result); @@ -62,8 +62,8 @@ export class FilterStack implements Filter { return result; } - receiveTrailers(status: Promise): Promise { - let result: Promise = status; + receiveTrailers(status: StatusObject): StatusObject { + let result: StatusObject = status; for (let i = this.filters.length - 1; i >= 0; i--) { result = this.filters[i].receiveTrailers(result); diff --git a/packages/grpc-js/src/filter.ts b/packages/grpc-js/src/filter.ts index 9d437a67..c1e412ae 100644 --- a/packages/grpc-js/src/filter.ts +++ b/packages/grpc-js/src/filter.ts @@ -25,21 +25,21 @@ import { Metadata } from './metadata'; export interface Filter { sendMetadata(metadata: Promise): Promise; - receiveMetadata(metadata: Promise): Promise; + receiveMetadata(metadata: Metadata): Metadata; sendMessage(message: Promise): Promise; receiveMessage(message: Promise): Promise; - receiveTrailers(status: Promise): Promise; + receiveTrailers(status: StatusObject): StatusObject; } -export abstract class BaseFilter { +export abstract class BaseFilter implements Filter { async sendMetadata(metadata: Promise): Promise { return metadata; } - async receiveMetadata(metadata: Promise): Promise { + receiveMetadata(metadata: Metadata): Metadata { return metadata; } @@ -51,7 +51,7 @@ export abstract class BaseFilter { return message; } - async receiveTrailers(status: Promise): Promise { + receiveTrailers(status: StatusObject): StatusObject { return status; } } diff --git a/packages/grpc-js/src/metadata-status-filter.ts b/packages/grpc-js/src/metadata-status-filter.ts index 5ca401f7..f3879d7a 100644 --- a/packages/grpc-js/src/metadata-status-filter.ts +++ b/packages/grpc-js/src/metadata-status-filter.ts @@ -22,9 +22,9 @@ import { Status } from './constants'; import { BaseFilter, Filter, FilterFactory } from './filter'; export class MetadataStatusFilter extends BaseFilter implements Filter { - async receiveTrailers(status: Promise): Promise { + receiveTrailers(status: StatusObject): StatusObject { // tslint:disable-next-line:prefer-const - let { code, details, metadata } = await status; + let { code, details, metadata } = status; if (code !== Status.UNKNOWN) { // we already have a known status, so don't assign a new one. return { code, details, metadata };