From a1c5af652d2a4a9ddacc55f7109dd330bc3cfa61 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 24 Aug 2017 12:29:28 -0700 Subject: [PATCH] Added most of the channel and call-stream implementations, plus filters --- src/call-credentials-filter.ts | 28 ++++ src/call-stream.ts | 228 ++++++++++++++++++++++++++++++++- src/channel.ts | 97 +++++++------- src/compression-filter.ts | 27 ++++ src/deadline-filter.ts | 55 ++++++++ src/filter-stack.ts | 26 ++++ src/filter.ts | 28 ++++ 7 files changed, 439 insertions(+), 50 deletions(-) create mode 100644 src/call-credentials-filter.ts create mode 100644 src/compression-filter.ts create mode 100644 src/deadline-filter.ts create mode 100644 src/filter-stack.ts create mode 100644 src/filter.ts diff --git a/src/call-credentials-filter.ts b/src/call-credentials-filter.ts new file mode 100644 index 00000000..7b10072d --- /dev/null +++ b/src/call-credentials-filter.ts @@ -0,0 +1,28 @@ +import {promisify} from 'util' +import {Filter} from './filter' +import {CallCredentials} from './call-credentials' + +export class CallCredentialsFilter extends BaseFilter implements Filter { + + private credsMetadata: Promise; + + constructor(credentials: CallCredentials) { + // TODO(murgatroid99): pass real options to generateMetadata + credsMetadata = util.promisify(credentials.generateMetadata.bind(credentials))({}); + } + + async sendMetadata(metadata: Promise) { + return (await metadata).merge(await this.credsMetadata); + } +} + +export class CallCredentialsFilterFactory implements FilterFactory { + private credentials: CallCredentials | null; + constructor(channel: Http2Channel) { + this.credentials = channel.credentials.getCallCredentials(); + } + + createFilter(callStream: CallStream): CallCredentialsFilter { + return new CallCredentialsFilter(this.credentials.compose(callStream.credentials)); + } +} diff --git a/src/call-stream.ts b/src/call-stream.ts index dcd4a111..f28d9789 100644 --- a/src/call-stream.ts +++ b/src/call-stream.ts @@ -4,6 +4,8 @@ import {CallCredentials} from './call-credentials'; import {Status} from './constants'; import {Metadata} from './metadata'; import {ObjectDuplex} from './object-stream'; +import {Filter} from './filter' +import {FilterStackFactory} from './filter-stack' export interface CallOptions { deadline?: Date|number; @@ -61,17 +63,239 @@ export interface CallStream extends ObjectDuplex { this; } +enum ReadState { + NO_DATA, + READING_SIZE, + READING_MESSAGE +} + export class Http2CallStream extends stream.Duplex implements CallStream { + private filterStack: Filter; + private statusEmitted: bool = false; + private http2Stream: ClientHttp2Stream | null = null; + private pendingRead: bool = false; + private pendingWrite: Buffer | null = null; + private pendingWriteCallback: Function | null = null; + private pendingFinalCallback: Function | null = null; + + private readState: ReadState = ReadState.NO_DATA; + private readCompressFlag: bool = false; + private readPartialSize: Buffer = Buffer.alloc(4); + private readSizeRemaining: number = 4; + private readMessageSize: number = 0; + private readPartialMessage: Buffer[] = []; + private readMessageRemaining = 0; + + private unpushedReadMessages: (Buffer | null)[] = []; + + // Status code mapped from :status. To be used if grpc-status is not received + private mappedStatusCode: Status = Status.UNKNOWN; + + constructor(public readonly methodName: string, public readonly options: CallOptions, + filterStackFactory: FilterStackFactory) { + this.filterStack = FilterStackFactory.createFilter(this); + } + + private endCall(status: StatusObject): void { + if (!this.statusEmitted) { + this.emit('status', {code: status, details: details, metadata: new Metadata()}); + this.statusEmitted = true; + } + } + attachHttp2Stream(stream: ClientHttp2Stream): void { - throw new Error('Not yet implemented'); + if (this.statusEmitted) { + // TODO(murgatroid99): Handle call end before http2 stream start + } else { + this.http2Stream = stream; + stream.on('response', (headers) => { + switch(headers[HTTP2_HEADER_STATUS]) { + // TODO(murgatroid99): handle 100 and 101 + case '400': + this.mappedStatusCode = Status.INTERNAL; + break; + case '401': + this.mappedStatusCode = Status.UNAUTHENTICATED; + break; + case '403': + this.mappedStatusCode = Status.PERMISSION_DENIED; + break; + case '404': + this.mappedStatusCode = Status.UNIMPLEMENTED; + break; + case '429': + case '502': + case '503': + case '504': + this.mappedStatusCode = Status.UNAVAILABLE; + break; + default: + this.mappedStatusCode = Status.UNKNOWN; + } + delete headers[HTTP2_HEADERS_STATUS]; + delete headers[HTTP2_HEADERS_CONTENT_TYPE]; + let metadata: Metadata; + try { + metadata = Metadata.fromHttp2Headers(headers); + } catch (e) { + this.cancelWithStatus(Status.UNKNOWN, e.message); + return; + } + this.filterStack.receiveMetadata(Promise.resolve(metadata)).then((finalMetadata) => { + this.emit('metadata', finalMetadata); + }, (error) => { + this.cancelWithStatus(Status.UNKNOWN, error.message); + }); + }); + stream.on('trailers', (headers) => { + let code: Status = this.mappedStatusCode; + if (headers.hasOwnProperty('grpc-status')) { + let receivedCode = Number(headers['grpc-status']); + if (possibleCode in Status) { + code = receivedCode; + } else { + code = Status.UNKNOWN; + } + delete headers['grpc-status']; + } + let details: string = ''; + if (headers.hasOwnProperty('grpc-message')) { + details = decodeURI(headers['grpc-message']); + } + let metadata: Metadata; + try { + metadata = Metadata.fromHttp2Headers(headers); + } catch (e) { + metadata = new Metadata(); + } + this.filterStack.receiveTrailers(Promise.resolve(status)).then((finalStatus) => { + this.endCall(finalStatus); + }, (error) => { + this.endCall({ + code: Status.INTERNAL, + details: 'Failed to process received status', + metadata: new Metadata(); + }); + }); + }); + stream.on('read', (data) => { + let readHead = 0; + let canPush = true; + while (readHead < data.length) { + switch(this.readState) { + case ReadState.NO_DATA: + readCompressFlag = (data.readUInt8(readHead) !== 0); + this.readState = ReadState.READING_SIZE; + this.readPartialSize.fill(0); + this.readSizeRemaining = 4; + this.readMessageSize = 0; + this.readMessageRemaining = 0; + this.readPartialMessage = []; + break; + case ReadState.READING_SIZE: + let toRead: number = Math.min(data.length - readHead, this.readSizeRemaining); + data.copy(readPartialSize, 4 - this.readSizeRemaining, readHead, readHead + toRead); + this.readSizeRemaining -= toRead; + readHead += toRead; + // readSizeRemaining >=0 here + if (this.readSizeRemaining === 0) { + this.readMessageSize = readPartialSize.readUInt32BE(0); + this.readMessageRemaining = this.readMessageSize; + this.readState = ReadState.READING_MESSAGE; + } + break; + case ReadSize.READING_MESSAGE: + let toRead: number = math.min(data.length - readHead, this.readMessageRemaining); + readPartialMessage.push(data.slice(readHead, readHead + toRead)); + this.readMessageRemaining -= toRead; + this.readHead += toRead; + // readMessageRemaining >=0 here + if (this.readMessageRemaining === 0) { + // At this point, we have read a full message + let messageBytes = Buffer.concat(readPartialMessage, readMessageSize); + // TODO(murgatroid99): Add receive message filters + if (canPush) { + if (!this.push(messageBytes)) { + canPush = false; + this.http2Stream.pause(); + } + } else { + this.unpushedReadMessages.push(messageBytes); + } + } + } + } + }); + stream.on('end', () => { + if (this.unpushedReadMessages.length === 0) { + this.push(null); + } else { + this.unpushedReadMessages.push(null); + } + }); + } } cancelWithStatus(status: Status, details: string): void { - throw new Error('Not yet implemented'); + this.endCall({code: status, details: details, metadata: new Metadata()}); + if (this.http2Stream !== null) { + /* TODO(murgatroid99): Determine if we want to send different RST_STREAM + * codes based on the status code */ + this.http2Stream.rstWithCancel(); + } } getPeer(): string { throw new Error('Not yet implemented'); } + + _read(size: number) { + if (this.http2Stream === null) { + this.pendingRead = true; + } else { + while (unpushedReadMessages.length > 0) { + let nextMessage = unpushedReadMessages.shift(); + let keepPushing = this.push(nextMessage); + if (nextMessage === null || (!keepPushing)) { + return; + } + } + /* Only resume reading from the http2Stream if we don't have any pending + * messages to emit, and we haven't gotten the signal to stop pushing + * messages */ + this.http2Stream.resume(); + } + } + + // Encode a message to the wire format + private encodeMessage(message: WriteObject): Buffer { + /* unsafeAlloc doesn't initiate the bytes in the buffer. We are explicitly + * overwriting every single byte, so that should be fine */ + let output: Buffer = Buffer.unsafeAlloc(message.length + 5); + // TODO(murgatroid99): handle compressed flag appropriately + output.writeUInt8(0, 0); + output.writeUint32BE(message.message.length, 1); + message.message.copy(output, 5); + return output; + } + + _write(chunk: WriteObject, encoding: string, cb: Function) { + // TODO(murgatroid99): Add send message filters + let encodedMessage = encodeMessage(chunk); + if (this.http2Stream === null) { + this.pendingWrite = encodedMessage; + this.pendingWriteCallback = cb; + } else { + this.http2Stream.write(encodedMessage, cb); + } + } + + _final(cb: Function) { + if (this.http2Stream === null) { + this.pendingFinalCallback = cb; + } else { + this.http2Stream.end(cb); + } + } } diff --git a/src/channel.ts b/src/channel.ts index 52745082..9f6e1d4b 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -5,7 +5,13 @@ import {IncomingHttpHeaders, OutgoingHttpHeaders} from 'http'; import * as url from 'url'; import {CallOptions, CallStream} from './call-stream'; import {ChannelCredentials} from './channel-credentials'; -import {Metadata} from './metadata'; +import {Metadata, MetadataObject} from './metadata'; +import {Status} from './constants' + +import {FilterStackFactory} from './filter-stack' +import {DeadlineFilterFactory} from './deadline-filter' +import {CallCredentialsFilterFactory} from './call-credentials-filter' +import {Http2FilterFactory} from './http2-filter' const IDLE_TIMEOUT_MS = 300000; @@ -39,7 +45,7 @@ export enum ConnectivityState { */ export interface Channel extends EventEmitter { createStream(methodName: string, metadata: OutgoingHttp2Headers, options: CallOptions): CallStream; - connect(): void; + connect(() => void): void; getConnectivityState(): ConnectivityState; close(): void; @@ -50,17 +56,6 @@ export interface Channel extends EventEmitter { prependListener(event: string, listener: Function): this; prependOnceListener(event: string, listener: Function): this; removeListener(event: string, listener: Function): this; - - addListener(event: 'connectivity_state_changed', listener: (state: ConnectivityState) => void): this; - emit(event: 'connectivity_state_changed', state: ConnectivityState): boolean; - on(event: 'connectivity_state_changed', listener: (state: ConnectivityState) => void): this; - once(event: 'connectivity_state_changed', listener: (state: ConnectivityState) => void): this; - prependListener(event: 'connectivity_state_changed', listener: (state: ConnectivityState) => void): - this; - prependOnceListener( - event: 'connectivity_state_changed', listener: (state: ConnectivityState) => void): this; - removeListener(event: 'connectivity_state_changed', listener: (state: ConnectivityState) => void): - this; } export class Http2Channel extends EventEmitter implements Channel { @@ -69,13 +64,13 @@ export class Http2Channel extends EventEmitter implements Channel { /* For now, we have up to one subchannel, which will exist as long as we are * connecting or trying to connect */ private subChannel : Http2Session | null; - private secureContext : SecureContext | null; private address : url.Url; + private filterStackFactory : FilterStackFactory; private transitionToState(newState: ConnectivityState): void { if (newState !== this.connectivityState) { this.connectivityState = newState; - this.emit('connectivity_state_changed', newState); + this.emit('connectivityStateChanged', newState); } } @@ -85,6 +80,9 @@ export class Http2Channel extends EventEmitter implements Channel { this.subChannel.on('connect', () => { this.transitionToState(ConnectivityState.READY); }); + this.subChannel.setTimeout(IDLE_TIMEOUT_MS, () => { + this.goIdle(); + }); } private goIdle(): void { @@ -92,59 +90,62 @@ export class Http2Channel extends EventEmitter implements Channel { this.transitionToState(ConnectivityState.IDLE); } - /* Reset the lastRpcActivity date to now, and kick the connectivity state - * machine out of idle */ private kickConnectivityState(): void { if (this.connectivityState === ConnectivityState.IDLE) { this.startConnecting(); - } else { - clearTimeout(this.idleTimeoutId); } - this.idleTimeoutId = setTimeout(() => { - this.goIdle(); - }, IDLE_TIMEOUT_MS); } - constructor(address: url.Url, - credentials: ChannelCredentials, + constructor(private readonly address: url.Url, + public readonly credentials: ChannelCredentials, private readonly options: ChannelOptions) { - this.secureContext = credentials.getSecureContext(); - if (this.secureContext === null) { + if (channelCredentials.getSecureContext() === null) { address.protocol = 'http'; } else { address.protocol = 'https'; } - this.address = address; + this.filterStackFactory = new FilterStackFactory([ + new CompressionFilterFactory(this), + new CallCredentialsFilterFactory(this), + new DeadlineFilterFactory(this) + ]); } - createStream(methodName: string, metadata: OutgoingHttpHeaders, options: CallOptions): CallStream { + createStream(methodName: string, metadata: Metadata, options: CallOptions): CallStream { if (this.connectivityState === ConnectivityState.SHUTDOWN) { throw new Error('Channel has been shut down'); } - this.kickConnectivityState(); - let stream: Http2CallStream = new Http2CallStream(); - metadata[HTTP2_HEADER_AUTHORITY] = this.address.hostname; - metadata[HTTP2_HEADER_METHOD] = 'POST'; - metadata[HTTP2_HEADER_PATH] = methodName; - metadata[HTTP2_HEADER_TE] = 'trailers'; - if (this.connectivityState === ConnectivityState.READY) { - stream.attachHttp2Stream(this.subchannel.request(metadata)); - } else { - let connectCb = (state) => { - if (state === ConnectivityState.READY) { - stream.attachHttp2Stream(this.subchannel.request(metadata)); - this.removeListener('connectivity_state_changed', connectCb); + let stream: Http2CallStream = new Http2CallStream(methodName, options, this.filterStackFactory); + let finalMetadata: Promise = stream.filterStack.sendMetadata(Promise.resolve(metadata)); + this.connect(() => { + finalMetadata.then((metadataValue) => { + let headers = metadataValue.toHttp2Headers(); + headers[HTTP2_HEADER_AUTHORITY] = this.address.hostname; + headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; + headers[HTTP2_HEADER_METHOD] = 'POST'; + headers[HTTP2_HEADER_PATH] = methodName; + headers[HTTP2_HEADER_TE] = 'trailers'; + if (stream.isOpen()) { + stream.attachHttp2Stream(this.subchannel.request(headers)); } - }; - this.on('connectivity_state_changed', connectCb); - } + }, (error) => { + stream.cancelWithStatus(Status.UNKNOWN, "Failed to generate metadata"); + }); + }); + return stream; } - connect(): void { - if (this.connectivityState === ConnectivityState.SHUTDOWN) { - throw new Error('Channel has been shut down'); - } + connect(callback: () => void): void { this.kickConnectivityState(); + if (this.connectivityState === ConnectivityState.READY) { + setImmediate(callback); + } else { + this.on('connectivityStateChanged', (newState) => { + if (newState === ConnectivityState.READY) { + callback(); + } + }); + } } getConnectivityState(): ConnectivityState{ diff --git a/src/compression-filter.ts b/src/compression-filter.ts new file mode 100644 index 00000000..83d31692 --- /dev/null +++ b/src/compression-filter.ts @@ -0,0 +1,27 @@ +import {Filter, BaseFilter} from './filter' +import {Metadata} from './metadata' + +export class CompressionFilter extends BaseFilter implements Filter { + constructor() {} + + async sendMetadata(metadata: Promise): Promise { + let headers: Metadata = await metadata; + headers.set('grpc-encoding', 'identity'); + headers.set('grpc-accept-encoding', 'identity'); + return headers; + } + + async receiveMetadata(metadata: Promise): Promise { + constructor(channel) {} + createFilter(callStream: CallStream): CompressionFilter { + return new CompressionFilter(); + } +} diff --git a/src/deadline-filter.ts b/src/deadline-filter.ts new file mode 100644 index 00000000..b0dc378e --- /dev/null +++ b/src/deadline-filter.ts @@ -0,0 +1,55 @@ +import {Filter} from './filter' +import {Status} from './constants' + +const units = [ + ['m', 1], + ['S', 1000], + ['M', 60 * 1000], + ['H', 60 * 60 * 1000] +] + +export class DeadlineFilter extends BaseFilter implements Filter { + private deadline; + constructor(private readonly channel: Channel, private readonly callStream: CallStream) { + let deadline = callStream.deadline; + this.deadline = deadline; + let now: number = (new Date()).getTime(); + let timeout = deadline - now; + if (timeout < 0) { + timeout = 0; + } + if (deadline !== Infinity) { + setTimeout(() => { + callStream.cancelWithStatus(Status.DEADLINE_EXCEEDED, 'Deadline exceeded'); + }, timeout); + } + } + + async sendMetadata(metadata: Promise) { + if (this.deadline === Infinity) { + return await metadata; + } + let timeoutString : Promise = new Promise((resolve, reject) => { + this.channel.connect(() => { + let now = (new Date()).getTime(); + let timeoutMs = this.deadline - now; + for (let [unit, factor] of units) { + let amount = timeoutMs / factor; + if (amount < 1e8) { + resolve(String(Math.ceil(amount)) + unit); + return; + } + } + }); + }); + (await metadata).set('grpc-timeout', await timeoutString); + } +} + +export class DeadlineFilterFactory implements FilterFactory { + constructor(private readonly channel: Http2Channel) {} + + createFilter(callStream: CallStream): DeadlineFilter { + return new DeadlineFilter(this.channel, callStream); + } +} diff --git a/src/filter-stack.ts b/src/filter-stack.ts new file mode 100644 index 00000000..28e7dd99 --- /dev/null +++ b/src/filter-stack.ts @@ -0,0 +1,26 @@ +import {flow, map} from 'lodash'; +import {Filter} from './filter' + +export class FilterStack implements Filter { + constructor(private readonly filters: Filter[]) {} + + async sendMetadata(metadata: Promise) { + return await flow(map(filters, (filter) => filter.sendMetadata.bind(filter)))(metadata); + } + + async receiveMetadata(metadata: Promise) { + return await flowRight(map(filters, (filter) => filter.receiveMetadata.bind(filter)))(metadata); + } + + async receiveTrailers(status: Promise): Promise { + return await flowRight(map(filters, (filter) => filter.receiveTrailers.bind(filter)))(status); + } +} + +export class FilterStackFactory implements FilterFactory { + constructor(private readonly factories: FilterFactory[]) {} + + createFilter(callStream: CallStream): FilterStack { + return new FilterStack(map(factories, (factory) => factory.createFilter(callStream))); + } +} diff --git a/src/filter.ts b/src/filter.ts new file mode 100644 index 00000000..7d6a7a29 --- /dev/null +++ b/src/filter.ts @@ -0,0 +1,28 @@ +import {Metadata} from './metadata' +import {WriteObject, CallStream} from './call-stream' + +export interface Filter { + async sendMetadata(metadata: Promise): Promise; + + async receiveMetadata(metadata: Promise): Promise; + + async receiveTrailers(status: Promise): Promise; +} + +export abstract class BaseFilter { + async sendMetadata(metadata: Promise): Promise { + return await metadata; + } + + async receiveMetadata(metadata: Promise): Promise { + return await metadata; + } + + async receiveTrailers(status: Promise): Promise { + return await status; + } +} + +export interface FilterFactory { + createFilter(callStream: CallStream): T; +}