Call Stream Unit Tests (#12)

PR-URL: #12
This commit is contained in:
Kelvin Jin 2017-09-01 19:08:28 -07:00 committed by GitHub
parent 0774aafb01
commit e302400a8a
4 changed files with 383 additions and 10 deletions

View File

@ -80,7 +80,7 @@ function makeCompileFn(globs) {
*/
gulp.task('js.core.lint', 'Emits linting errors found in src/ and test/.', () => {
const program = require('tslint').Linter.createProgram(tsconfigPath);
gulp.src([`${srcDir}/**/*.ts`, `${srcDir}/**/*.ts`])
gulp.src([`${srcDir}/**/*.ts`, `${testDir}/**/*.ts`])
.pipe(tslint({
configuration: tslintPath,
formatter: 'codeFrame',

View File

@ -81,6 +81,8 @@ enum ReadState {
READING_MESSAGE
}
const emptyBuffer = Buffer.alloc(0);
export class Http2CallStream extends Duplex implements CallStream {
public filterStack: Filter;
private statusEmitted = false;
@ -121,6 +123,18 @@ export class Http2CallStream extends Duplex implements CallStream {
}
}
private tryPush(messageBytes: Buffer, canPush: boolean): boolean {
if (canPush) {
if (!this.push(messageBytes)) {
canPush = false;
(this.http2Stream as http2.ClientHttp2Stream).pause();
}
} else {
this.unpushedReadMessages.push(messageBytes);
}
return canPush;
}
attachHttp2Stream(stream: http2.ClientHttp2Stream): void {
if (this.finalStatus !== null) {
stream.rstWithCancel();
@ -230,7 +244,12 @@ export class Http2CallStream extends Duplex implements CallStream {
if (this.readSizeRemaining === 0) {
this.readMessageSize = this.readPartialSize.readUInt32BE(0);
this.readMessageRemaining = this.readMessageSize;
this.readState = ReadState.READING_MESSAGE;
if (this.readMessageRemaining > 0) {
this.readState = ReadState.READING_MESSAGE;
} else {
canPush = this.tryPush(emptyBuffer, canPush);
this.readState = ReadState.NO_DATA;
}
}
break;
case ReadState.READING_MESSAGE:
@ -246,14 +265,7 @@ export class Http2CallStream extends Duplex implements CallStream {
const messageBytes = Buffer.concat(
this.readPartialMessage, this.readMessageSize);
// TODO(murgatroid99): Add receive message filters
if (canPush) {
if (!this.push(messageBytes)) {
canPush = false;
(this.http2Stream as http2.ClientHttp2Stream).pause();
}
} else {
this.unpushedReadMessages.push(messageBytes);
}
canPush = this.tryPush(messageBytes, canPush);
this.readState = ReadState.NO_DATA;
}
}

View File

@ -5,6 +5,14 @@ export function mockFunction(): never {
}
export namespace assert2 {
const toCall = new Map<() => void, number>();
const afterCallsQueue: Array<() => void> = [];
/**
* Assert that the given function doesn't throw an error, and then return
* its value.
* @param fn The function to evaluate.
*/
export function noThrowAndReturn<T>(fn: () => T): T {
try {
return fn();
@ -13,4 +21,59 @@ export namespace assert2 {
throw e; // for type safety only
}
}
/**
* Helper function that returns true when every function wrapped with
* mustCall has been called.
*/
function mustCallsSatisfied(): boolean {
let result = true;
toCall.forEach((value) => {
result = result && value === 0;
});
return result;
}
export function clearMustCalls(): void {
afterCallsQueue.length = 0;
}
/**
* Wraps a function to keep track of whether it was called or not.
* @param fn The function to wrap.
*/
export function mustCall<T>(fn: (...args: any[]) => T): (...args: any[]) => T {
const existingValue = toCall.get(fn);
if (existingValue !== undefined) {
toCall.set(fn, existingValue + 1);
} else {
toCall.set(fn, 1);
}
return (...args: any[]) => {
const result = fn(...args);
const existingValue = toCall.get(fn);
if (existingValue !== undefined) {
toCall.set(fn, existingValue - 1);
}
if (mustCallsSatisfied()) {
afterCallsQueue.forEach(fn => fn());
afterCallsQueue.length = 0;
}
return result;
};
}
/**
* Calls the given function when every function that was wrapped with
* mustCall has been called.
* @param fn The function to call once all mustCall-wrapped functions have
* been called.
*/
export function afterMustCallsSatisfied(fn: () => void): void {
if (!mustCallsSatisfied()) {
afterCallsQueue.push(fn);
} else {
fn();
}
}
}

View File

@ -0,0 +1,298 @@
import * as assert from 'assert';
import { CallCredentials } from '../src/call-credentials';
import { Http2CallStream } from '../src/call-stream';
import { mockFunction, assert2 } from './common';
import { Status } from '../src/constants';
import { EventEmitter } from 'events';
import { FilterStackFactory } from '../src/filter-stack';
import * as http2 from 'http2';
import { forOwn, range } from 'lodash';
import { Metadata } from '../src/metadata';
import * as stream from 'stream';
interface DataFrames {
payload: Buffer,
frameLengths: number[]
}
const {
HTTP2_HEADER_STATUS
} = http2.constants;
function serialize(data: string): Buffer {
const header: Buffer = Buffer.alloc(5);
header.writeUInt8(0, 0); // TODO: Uncompressed only
header.writeInt32BE(data.length, 1);
return Buffer.concat([header, Buffer.from(data, 'utf8')]);
}
class ClientHttp2StreamMock extends stream.Duplex implements http2.ClientHttp2Stream {
constructor(private readonly dataFrames: DataFrames) {
super();
}
emitResponse(responseCode: number, metadata?: Metadata) {
this.emit('response', {
[HTTP2_HEADER_STATUS]: responseCode,
...metadata ? metadata.toHttp2Headers() : {}
});
}
bytesRead = 0;
dataFrame = 0;
aborted: boolean;
destroyed: boolean;
rstCode: number;
session: http2.Http2Session;
state: http2.StreamState;
priority = mockFunction;
rstStream = mockFunction;
rstWithNoError = mockFunction;
rstWithProtocolError = mockFunction;
rstWithCancel = mockFunction;
rstWithRefuse = mockFunction;
rstWithInternalError = mockFunction;
setTimeout = mockFunction;
_read() {
if (this.dataFrame === this.dataFrames.frameLengths.length) {
if (this.bytesRead < this.dataFrames.payload.length) {
this.push(this.dataFrames.payload.slice(
this.bytesRead, this.dataFrames.payload.length));
}
this.push(null);
return;
}
const from = this.bytesRead;
this.bytesRead += this.dataFrames.frameLengths[this.dataFrame++];
this.push(this.dataFrames.payload.slice(from, this.bytesRead));
};
_write(chunk: Buffer, encoding: string, cb: Function) {
this.emit('write', chunk);
cb();
}
}
describe('CallStream', () => {
const callStreamArgs = {
deadline: Infinity,
credentials: CallCredentials.createEmpty(),
flags: 0
};
const filterStackFactory = new FilterStackFactory([]);
const message = 'eat this message'; // 16 bytes
beforeEach(() => {
assert2.clearMustCalls();
});
it('should emit a metadata event when it receives a response event', (done) => {
const responseMetadata = new Metadata();
responseMetadata.add('key', 'value');
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: Buffer.alloc(0),
frameLengths: []
});
callStream.once('metadata', assert2.mustCall((metadata) => {
assert.deepStrictEqual(metadata.get('key'), ['value']);
}));
callStream.attachHttp2Stream(http2Stream);
http2Stream.emitResponse(200, responseMetadata);
assert2.afterMustCallsSatisfied(done);
});
it('should end a call with an error if a stream was closed', (done) => {
const c = http2.constants;
const s = Status;
const errorCodeMapping = {
[c.NGHTTP2_NO_ERROR]: s.INTERNAL,
[c.NGHTTP2_PROTOCOL_ERROR]: s.INTERNAL,
[c.NGHTTP2_INTERNAL_ERROR]: s.INTERNAL,
[c.NGHTTP2_FLOW_CONTROL_ERROR]: s.INTERNAL,
[c.NGHTTP2_SETTINGS_TIMEOUT]: s.INTERNAL,
[c.NGHTTP2_STREAM_CLOSED]: null,
[c.NGHTTP2_FRAME_SIZE_ERROR]: s.INTERNAL,
[c.NGHTTP2_REFUSED_STREAM]: s.UNAVAILABLE,
[c.NGHTTP2_CANCEL]: s.CANCELLED,
[c.NGHTTP2_COMPRESSION_ERROR]: s.INTERNAL,
[c.NGHTTP2_CONNECT_ERROR]: s.INTERNAL,
[c.NGHTTP2_ENHANCE_YOUR_CALM]: s.RESOURCE_EXHAUSTED,
[c.NGHTTP2_INADEQUATE_SECURITY]: s.PERMISSION_DENIED
};
forOwn(errorCodeMapping, (value: Status | null, key) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: Buffer.alloc(0),
frameLengths: []
});
callStream.attachHttp2Stream(http2Stream);
if (value !== null) {
callStream.once('status', assert2.mustCall((status) => {
assert.strictEqual(status.code, value);
}));
}
http2Stream.emit('streamClosed', Number(key));
});
assert2.afterMustCallsSatisfied(done);
});
it('should have functioning getters', (done) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
assert.strictEqual(callStream.getDeadline(), callStreamArgs.deadline);
assert.strictEqual(callStream.getCredentials(), callStreamArgs.credentials);
assert.strictEqual(callStream.getStatus(), null);
callStream.on('status', assert2.mustCall((status) => {
assert.strictEqual(status.code, Status.CANCELLED);
assert.strictEqual(status.details, ';)');
assert.strictEqual(callStream.getStatus(), status);
}));
callStream.cancelWithStatus(Status.CANCELLED, ';)');
// TODO: getPeer
assert2.afterMustCallsSatisfied(done);
});
describe('attachHttp2Stream', () => {
it('should handle an empty message', (done) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: serialize(''),
frameLengths: []
});
callStream.once('data', assert2.mustCall((buffer) => {
assert.strictEqual(buffer.toString('utf8'), '');
}));
callStream.attachHttp2Stream(http2Stream);
assert2.afterMustCallsSatisfied(done);
});
[
{
description: 'all data is supplied in a single frame',
frameLengths: []
},
{
description: 'frames are split along header field delimiters',
frameLengths: [1, 4]
},
{
description: 'portions of header fields are split between different frames',
frameLengths: [2, 1, 1, 4]
},
{
description: 'frames are split into bytes',
frameLengths: range(0, 20).map(() => 1)
}
].forEach((testCase: { description: string, frameLengths: number[] }) => {
it(`should handle a short message where ${testCase.description}`, (done) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: serialize(message), // 21 bytes
frameLengths: testCase.frameLengths
});
callStream.once('data', assert2.mustCall((buffer) => {
assert.strictEqual(buffer.toString('utf8'), message);
}));
callStream.once('end', assert2.mustCall(() => {}));
callStream.attachHttp2Stream(http2Stream);
assert2.afterMustCallsSatisfied(done);
});
});
[
{
description: 'all data is supplied in a single frame',
frameLengths: []
},
{
description: 'frames are split between delimited messages',
frameLengths: [21]
},
{
description: 'frames are split within messages',
frameLengths: [10, 22]
},
{
description: 'part of 2nd message\'s header is in first frame',
frameLengths: [24]
},
{
description: 'frames are split into bytes',
frameLengths: range(0, 41).map(() => 1)
}
].forEach((testCase: { description: string, frameLengths: number[] }) => {
it(`should handle two messages where ${testCase.description}`, (done) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: Buffer.concat([serialize(message), serialize(message)]), // 42 bytes
frameLengths: testCase.frameLengths
});
callStream.once('data', assert2.mustCall((buffer) => {
assert.strictEqual(buffer.toString('utf8'), message);
}));
callStream.once('data', assert2.mustCall((buffer) => {
assert.strictEqual(buffer.toString('utf8'), message);
}));
callStream.once('end', assert2.mustCall(() => {}));
callStream.attachHttp2Stream(http2Stream);
assert2.afterMustCallsSatisfied(done);
});
});
it('should send buffered writes', (done) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: Buffer.alloc(0),
frameLengths: []
});
let streamFlushed = false;
http2Stream.once('write', assert2.mustCall((chunk: Buffer) => {
const dataLength = chunk.readInt32BE(1);
const encodedMessage = chunk.slice(5).toString('utf8');
assert.strictEqual(dataLength, message.length);
assert.strictEqual(encodedMessage, message);
streamFlushed = true;
}));
callStream.write({
message: Buffer.from(message)
}, assert2.mustCall(() => {
// Ensure this is called only after contents are written to http2Stream
assert.ok(streamFlushed);
}));
callStream.end(assert2.mustCall(() => {}));
callStream.attachHttp2Stream(http2Stream);
assert2.afterMustCallsSatisfied(done);
});
it('should cause data chunks in write calls afterward to be written to the given stream', (done) => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: Buffer.alloc(0),
frameLengths: []
});
http2Stream.once('write', assert2.mustCall((chunk: Buffer) => {
const dataLength = chunk.readInt32BE(1);
const encodedMessage = chunk.slice(5).toString('utf8');
assert.strictEqual(dataLength, message.length);
assert.strictEqual(encodedMessage, message);
}));
callStream.attachHttp2Stream(http2Stream);
callStream.write({
message: Buffer.from(message)
}, assert2.mustCall(() => {}));
callStream.end(assert2.mustCall(() => {}));
assert2.afterMustCallsSatisfied(done);
});
it('should handle underlying stream errors', () => {
const callStream = new Http2CallStream('foo', callStreamArgs, filterStackFactory);
const http2Stream = new ClientHttp2StreamMock({
payload: Buffer.alloc(0),
frameLengths: []
});
callStream.once('status', assert2.mustCall((status) => {
assert.strictEqual(status.code, Status.INTERNAL);
}));
callStream.attachHttp2Stream(http2Stream);
http2Stream.emit('error');
});
});
});