diff --git a/packages/grpc-js-core/gulpfile.js b/packages/grpc-js-core/gulpfile.js index 47bd097f..5416b30f 100644 --- a/packages/grpc-js-core/gulpfile.js +++ b/packages/grpc-js-core/gulpfile.js @@ -80,7 +80,7 @@ function makeCompileFn(globs) { */ gulp.task('js.core.lint', 'Emits linting errors found in src/ and test/.', () => { const program = require('tslint').Linter.createProgram(tsconfigPath); - gulp.src([`${srcDir}/**/*.ts`, `${srcDir}/**/*.ts`]) + gulp.src([`${srcDir}/**/*.ts`, `${testDir}/**/*.ts`]) .pipe(tslint({ configuration: tslintPath, formatter: 'codeFrame', diff --git a/packages/grpc-js-core/src/call-stream.ts b/packages/grpc-js-core/src/call-stream.ts index 0c7b4b7c..51c58756 100644 --- a/packages/grpc-js-core/src/call-stream.ts +++ b/packages/grpc-js-core/src/call-stream.ts @@ -81,6 +81,8 @@ enum ReadState { READING_MESSAGE } +const emptyBuffer = Buffer.alloc(0); + export class Http2CallStream extends Duplex implements CallStream { public filterStack: Filter; private statusEmitted = false; @@ -121,6 +123,18 @@ export class Http2CallStream extends Duplex implements CallStream { } } + private tryPush(messageBytes: Buffer, canPush: boolean): boolean { + if (canPush) { + if (!this.push(messageBytes)) { + canPush = false; + (this.http2Stream as http2.ClientHttp2Stream).pause(); + } + } else { + this.unpushedReadMessages.push(messageBytes); + } + return canPush; + } + attachHttp2Stream(stream: http2.ClientHttp2Stream): void { if (this.finalStatus !== null) { stream.rstWithCancel(); @@ -230,7 +244,12 @@ export class Http2CallStream extends Duplex implements CallStream { if (this.readSizeRemaining === 0) { this.readMessageSize = this.readPartialSize.readUInt32BE(0); this.readMessageRemaining = this.readMessageSize; - this.readState = ReadState.READING_MESSAGE; + if (this.readMessageRemaining > 0) { + this.readState = ReadState.READING_MESSAGE; + } else { + canPush = this.tryPush(emptyBuffer, canPush); + this.readState = ReadState.NO_DATA; + } } break; case ReadState.READING_MESSAGE: @@ -246,14 +265,7 @@ export class Http2CallStream extends Duplex implements CallStream { const messageBytes = Buffer.concat( this.readPartialMessage, this.readMessageSize); // TODO(murgatroid99): Add receive message filters - if (canPush) { - if (!this.push(messageBytes)) { - canPush = false; - (this.http2Stream as http2.ClientHttp2Stream).pause(); - } - } else { - this.unpushedReadMessages.push(messageBytes); - } + canPush = this.tryPush(messageBytes, canPush); this.readState = ReadState.NO_DATA; } } diff --git a/packages/grpc-js-core/test/common.ts b/packages/grpc-js-core/test/common.ts index 7fbab022..487ced1f 100644 --- a/packages/grpc-js-core/test/common.ts +++ b/packages/grpc-js-core/test/common.ts @@ -5,6 +5,14 @@ export function mockFunction(): never { } export namespace assert2 { + const toCall = new Map<() => void, number>(); + const afterCallsQueue: Array<() => void> = []; + + /** + * Assert that the given function doesn't throw an error, and then return + * its value. + * @param fn The function to evaluate. + */ export function noThrowAndReturn(fn: () => T): T { try { return fn(); @@ -13,4 +21,59 @@ export namespace assert2 { throw e; // for type safety only } } + + /** + * Helper function that returns true when every function wrapped with + * mustCall has been called. + */ + function mustCallsSatisfied(): boolean { + let result = true; + toCall.forEach((value) => { + result = result && value === 0; + }); + return result; + } + + export function clearMustCalls(): void { + afterCallsQueue.length = 0; + } + + /** + * Wraps a function to keep track of whether it was called or not. + * @param fn The function to wrap. + */ + export function mustCall(fn: (...args: any[]) => T): (...args: any[]) => T { + const existingValue = toCall.get(fn); + if (existingValue !== undefined) { + toCall.set(fn, existingValue + 1); + } else { + toCall.set(fn, 1); + } + return (...args: any[]) => { + const result = fn(...args); + const existingValue = toCall.get(fn); + if (existingValue !== undefined) { + toCall.set(fn, existingValue - 1); + } + if (mustCallsSatisfied()) { + afterCallsQueue.forEach(fn => fn()); + afterCallsQueue.length = 0; + } + return result; + }; + } + + /** + * Calls the given function when every function that was wrapped with + * mustCall has been called. + * @param fn The function to call once all mustCall-wrapped functions have + * been called. + */ + export function afterMustCallsSatisfied(fn: () => void): void { + if (!mustCallsSatisfied()) { + afterCallsQueue.push(fn); + } else { + fn(); + } + } } diff --git a/packages/grpc-js-core/test/test-call-stream.ts b/packages/grpc-js-core/test/test-call-stream.ts new file mode 100644 index 00000000..e7126d62 --- /dev/null +++ b/packages/grpc-js-core/test/test-call-stream.ts @@ -0,0 +1,298 @@ +import * as assert from 'assert'; +import { CallCredentials } from '../src/call-credentials'; +import { Http2CallStream } from '../src/call-stream'; +import { mockFunction, assert2 } from './common'; +import { Status } from '../src/constants'; +import { EventEmitter } from 'events'; +import { FilterStackFactory } from '../src/filter-stack'; +import * as http2 from 'http2'; +import { forOwn, range } from 'lodash'; +import { Metadata } from '../src/metadata'; +import * as stream from 'stream'; + +interface DataFrames { + payload: Buffer, + frameLengths: number[] +} + +const { + HTTP2_HEADER_STATUS +} = http2.constants; + +function serialize(data: string): Buffer { + const header: Buffer = Buffer.alloc(5); + header.writeUInt8(0, 0); // TODO: Uncompressed only + header.writeInt32BE(data.length, 1); + return Buffer.concat([header, Buffer.from(data, 'utf8')]); +} + +class ClientHttp2StreamMock extends stream.Duplex implements http2.ClientHttp2Stream { + constructor(private readonly dataFrames: DataFrames) { + super(); + } + emitResponse(responseCode: number, metadata?: Metadata) { + this.emit('response', { + [HTTP2_HEADER_STATUS]: responseCode, + ...metadata ? metadata.toHttp2Headers() : {} + }); + } + bytesRead = 0; + dataFrame = 0; + aborted: boolean; + destroyed: boolean; + rstCode: number; + session: http2.Http2Session; + state: http2.StreamState; + priority = mockFunction; + rstStream = mockFunction; + rstWithNoError = mockFunction; + rstWithProtocolError = mockFunction; + rstWithCancel = mockFunction; + rstWithRefuse = mockFunction; + rstWithInternalError = mockFunction; + setTimeout = mockFunction; + _read() { + if (this.dataFrame === this.dataFrames.frameLengths.length) { + if (this.bytesRead < this.dataFrames.payload.length) { + this.push(this.dataFrames.payload.slice( + this.bytesRead, this.dataFrames.payload.length)); + } + this.push(null); + return; + } + const from = this.bytesRead; + this.bytesRead += this.dataFrames.frameLengths[this.dataFrame++]; + this.push(this.dataFrames.payload.slice(from, this.bytesRead)); + }; + _write(chunk: Buffer, encoding: string, cb: Function) { + this.emit('write', chunk); + cb(); + } +} + +describe('CallStream', () => { + const callStreamArgs = { + deadline: Infinity, + credentials: CallCredentials.createEmpty(), + flags: 0 + }; + const filterStackFactory = new FilterStackFactory([]); + const message = 'eat this message'; // 16 bytes + + beforeEach(() => { + assert2.clearMustCalls(); + }); + + it('should emit a metadata event when it receives a response event', (done) => { + const responseMetadata = new Metadata(); + responseMetadata.add('key', 'value'); + const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory); + + const http2Stream = new ClientHttp2StreamMock({ + payload: Buffer.alloc(0), + frameLengths: [] + }); + callStream.once('metadata', assert2.mustCall((metadata) => { + assert.deepStrictEqual(metadata.get('key'), ['value']); + })); + callStream.attachHttp2Stream(http2Stream); + http2Stream.emitResponse(200, responseMetadata); + assert2.afterMustCallsSatisfied(done); + }); + + it('should end a call with an error if a stream was closed', (done) => { + const c = http2.constants; + const s = Status; + const errorCodeMapping = { + [c.NGHTTP2_NO_ERROR]: s.INTERNAL, + [c.NGHTTP2_PROTOCOL_ERROR]: s.INTERNAL, + [c.NGHTTP2_INTERNAL_ERROR]: s.INTERNAL, + [c.NGHTTP2_FLOW_CONTROL_ERROR]: s.INTERNAL, + [c.NGHTTP2_SETTINGS_TIMEOUT]: s.INTERNAL, + [c.NGHTTP2_STREAM_CLOSED]: null, + [c.NGHTTP2_FRAME_SIZE_ERROR]: s.INTERNAL, + [c.NGHTTP2_REFUSED_STREAM]: s.UNAVAILABLE, + [c.NGHTTP2_CANCEL]: s.CANCELLED, + [c.NGHTTP2_COMPRESSION_ERROR]: s.INTERNAL, + [c.NGHTTP2_CONNECT_ERROR]: s.INTERNAL, + [c.NGHTTP2_ENHANCE_YOUR_CALM]: s.RESOURCE_EXHAUSTED, + [c.NGHTTP2_INADEQUATE_SECURITY]: s.PERMISSION_DENIED + }; + forOwn(errorCodeMapping, (value: Status | null, key) => { + const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory); + const http2Stream = new ClientHttp2StreamMock({ + payload: Buffer.alloc(0), + frameLengths: [] + }); + callStream.attachHttp2Stream(http2Stream); + if (value !== null) { + callStream.once('status', assert2.mustCall((status) => { + assert.strictEqual(status.code, value); + })); + } + http2Stream.emit('streamClosed', Number(key)); + }); + assert2.afterMustCallsSatisfied(done); + }); + + it('should have functioning getters', (done) => { + const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory); + assert.strictEqual(callStream.getDeadline(), callStreamArgs.deadline); + assert.strictEqual(callStream.getCredentials(), callStreamArgs.credentials); + assert.strictEqual(callStream.getStatus(), null); + callStream.on('status', assert2.mustCall((status) => { + assert.strictEqual(status.code, Status.CANCELLED); + assert.strictEqual(status.details, ';)'); + assert.strictEqual(callStream.getStatus(), status); + })); + callStream.cancelWithStatus(Status.CANCELLED, ';)'); + // TODO: getPeer + assert2.afterMustCallsSatisfied(done); + }); + + describe('attachHttp2Stream', () => { + it('should handle an empty message', (done) => { + const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory); + const http2Stream = new ClientHttp2StreamMock({ + payload: serialize(''), + frameLengths: [] + }); + callStream.once('data', assert2.mustCall((buffer) => { + assert.strictEqual(buffer.toString('utf8'), ''); + })); + callStream.attachHttp2Stream(http2Stream); + assert2.afterMustCallsSatisfied(done); + }); + + [ + { + description: 'all data is supplied in a single frame', + frameLengths: [] + }, + { + description: 'frames are split along header field delimiters', + frameLengths: [1, 4] + }, + { + description: 'portions of header fields are split between different frames', + frameLengths: [2, 1, 1, 4] + }, + { + description: 'frames are split into bytes', + frameLengths: range(0, 20).map(() => 1) + } + ].forEach((testCase: { description: string, frameLengths: number[] }) => { + it(`should handle a short message where ${testCase.description}`, (done) => { + const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory); + const http2Stream = new ClientHttp2StreamMock({ + payload: serialize(message), // 21 bytes + frameLengths: testCase.frameLengths + }); + callStream.once('data', assert2.mustCall((buffer) => { + assert.strictEqual(buffer.toString('utf8'), message); + })); + callStream.once('end', assert2.mustCall(() => {})); + callStream.attachHttp2Stream(http2Stream); + assert2.afterMustCallsSatisfied(done); + }); + }); + + [ + { + description: 'all data is supplied in a single frame', + frameLengths: [] + }, + { + description: 'frames are split between delimited messages', + frameLengths: [21] + }, + { + description: 'frames are split within messages', + frameLengths: [10, 22] + }, + { + description: 'part of 2nd message\'s header is in first frame', + frameLengths: [24] + }, + { + description: 'frames are split into bytes', + frameLengths: range(0, 41).map(() => 1) + } + ].forEach((testCase: { description: string, frameLengths: number[] }) => { + it(`should handle two messages where ${testCase.description}`, (done) => { + const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory); + const http2Stream = new ClientHttp2StreamMock({ + payload: Buffer.concat([serialize(message), serialize(message)]), // 42 bytes + frameLengths: testCase.frameLengths + }); + callStream.once('data', assert2.mustCall((buffer) => { + assert.strictEqual(buffer.toString('utf8'), message); + })); + callStream.once('data', assert2.mustCall((buffer) => { + assert.strictEqual(buffer.toString('utf8'), message); + })); + callStream.once('end', assert2.mustCall(() => {})); + callStream.attachHttp2Stream(http2Stream); + assert2.afterMustCallsSatisfied(done); + }); + }); + + it('should send buffered writes', (done) => { + const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory); + const http2Stream = new ClientHttp2StreamMock({ + payload: Buffer.alloc(0), + frameLengths: [] + }); + let streamFlushed = false; + http2Stream.once('write', assert2.mustCall((chunk: Buffer) => { + const dataLength = chunk.readInt32BE(1); + const encodedMessage = chunk.slice(5).toString('utf8'); + assert.strictEqual(dataLength, message.length); + assert.strictEqual(encodedMessage, message); + streamFlushed = true; + })); + callStream.write({ + message: Buffer.from(message) + }, assert2.mustCall(() => { + // Ensure this is called only after contents are written to http2Stream + assert.ok(streamFlushed); + })); + callStream.end(assert2.mustCall(() => {})); + callStream.attachHttp2Stream(http2Stream); + assert2.afterMustCallsSatisfied(done); + }); + + it('should cause data chunks in write calls afterward to be written to the given stream', (done) => { + const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory); + const http2Stream = new ClientHttp2StreamMock({ + payload: Buffer.alloc(0), + frameLengths: [] + }); + http2Stream.once('write', assert2.mustCall((chunk: Buffer) => { + const dataLength = chunk.readInt32BE(1); + const encodedMessage = chunk.slice(5).toString('utf8'); + assert.strictEqual(dataLength, message.length); + assert.strictEqual(encodedMessage, message); + })); + callStream.attachHttp2Stream(http2Stream); + callStream.write({ + message: Buffer.from(message) + }, assert2.mustCall(() => {})); + callStream.end(assert2.mustCall(() => {})); + assert2.afterMustCallsSatisfied(done); + }); + + it('should handle underlying stream errors', () => { + const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory); + const http2Stream = new ClientHttp2StreamMock({ + payload: Buffer.alloc(0), + frameLengths: [] + }); + callStream.once('status', assert2.mustCall((status) => { + assert.strictEqual(status.code, Status.INTERNAL); + })); + callStream.attachHttp2Stream(http2Stream); + http2Stream.emit('error'); + }); + }); +});