mirror of
https://github.com/grpc/grpc-node.git
synced 2025-12-08 18:23:54 +00:00
Merge pull request #1042 from murgatroid99/grpc-js_disconnect_status
Return UNAVAILABLE status on TCP disconnect
This commit is contained in:
commit
3674106e08
@ -27,6 +27,7 @@ import { Metadata } from './metadata';
|
||||
import { ObjectDuplex, WriteCallback } from './object-stream';
|
||||
import { StreamDecoder } from './stream-decoder';
|
||||
import { ChannelImplementation } from './channel';
|
||||
import { Subchannel } from './subchannel';
|
||||
|
||||
const {
|
||||
HTTP2_HEADER_STATUS,
|
||||
@ -112,6 +113,9 @@ export class Http2CallStream extends Duplex implements Call {
|
||||
// This is populated (non-null) if and only if the call has ended
|
||||
private finalStatus: StatusObject | null = null;
|
||||
|
||||
private subchannel: Subchannel | null = null;
|
||||
private disconnectListener: () => void;
|
||||
|
||||
constructor(
|
||||
private readonly methodName: string,
|
||||
private readonly channel: ChannelImplementation,
|
||||
@ -122,6 +126,9 @@ export class Http2CallStream extends Duplex implements Call {
|
||||
super({ objectMode: true });
|
||||
this.filterStack = filterStackFactory.createFilter(this);
|
||||
this.credentials = channelCallCredentials;
|
||||
this.disconnectListener = () => {
|
||||
this.endCall({code: Status.UNAVAILABLE, details: 'Connection dropped', metadata: new Metadata()});
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
@ -142,6 +149,10 @@ export class Http2CallStream extends Duplex implements Call {
|
||||
process.nextTick(() => {
|
||||
this.emit('status', status);
|
||||
});
|
||||
if (this.subchannel) {
|
||||
this.subchannel.callUnref();
|
||||
this.subchannel.removeDisconnectListener(this.disconnectListener);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -239,11 +250,14 @@ export class Http2CallStream extends Duplex implements Call {
|
||||
})();
|
||||
}
|
||||
|
||||
attachHttp2Stream(stream: http2.ClientHttp2Stream): void {
|
||||
attachHttp2Stream(stream: http2.ClientHttp2Stream, subchannel: Subchannel): void {
|
||||
if (this.finalStatus !== null) {
|
||||
stream.close(NGHTTP2_CANCEL);
|
||||
} else {
|
||||
this.http2Stream = stream;
|
||||
this.subchannel = subchannel;
|
||||
subchannel.addDisconnectListener(this.disconnectListener);
|
||||
subchannel.callRef();
|
||||
stream.on('response', (headers, flags) => {
|
||||
switch (headers[':status']) {
|
||||
// TODO(murgatroid99): handle 100 and 101
|
||||
|
||||
@ -84,6 +84,13 @@ export class Subchannel {
|
||||
*/
|
||||
private stateListeners: ConnectivityStateListener[] = [];
|
||||
|
||||
/**
|
||||
* A list of listener functions that will be called when the underlying
|
||||
* socket disconnects. Used for ending active calls with an UNAVAILABLE
|
||||
* status.
|
||||
*/
|
||||
private disconnectListeners: (() => void)[] = [];
|
||||
|
||||
private backoffTimeout: BackoffTimeout;
|
||||
|
||||
/**
|
||||
@ -274,6 +281,11 @@ export class Subchannel {
|
||||
switch (newState) {
|
||||
case ConnectivityState.READY:
|
||||
this.stopBackoff();
|
||||
this.session!.socket.once('close', () => {
|
||||
for (const listener of this.disconnectListeners) {
|
||||
listener();
|
||||
}
|
||||
});
|
||||
break;
|
||||
case ConnectivityState.CONNECTING:
|
||||
this.startBackoff();
|
||||
@ -322,7 +334,7 @@ export class Subchannel {
|
||||
}
|
||||
}
|
||||
|
||||
private callRef() {
|
||||
callRef() {
|
||||
if (this.callRefcount === 0) {
|
||||
if (this.session) {
|
||||
this.session.ref();
|
||||
@ -332,7 +344,7 @@ export class Subchannel {
|
||||
this.callRefcount += 1;
|
||||
}
|
||||
|
||||
private callUnref() {
|
||||
callUnref() {
|
||||
this.callRefcount -= 1;
|
||||
if (this.callRefcount === 0) {
|
||||
if (this.session) {
|
||||
@ -376,11 +388,7 @@ export class Subchannel {
|
||||
headers[HTTP2_HEADER_PATH] = callStream.getMethod();
|
||||
headers[HTTP2_HEADER_TE] = 'trailers';
|
||||
const http2Stream = this.session!.request(headers);
|
||||
this.callRef();
|
||||
http2Stream.on('close', () => {
|
||||
this.callUnref();
|
||||
});
|
||||
callStream.attachHttp2Stream(http2Stream);
|
||||
callStream.attachHttp2Stream(http2Stream, this);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -434,6 +442,17 @@ export class Subchannel {
|
||||
}
|
||||
}
|
||||
|
||||
addDisconnectListener(listener: () => void) {
|
||||
this.disconnectListeners.push(listener);
|
||||
}
|
||||
|
||||
removeDisconnectListener(listener: () => void) {
|
||||
const listenerIndex = this.disconnectListeners.indexOf(listener);
|
||||
if (listenerIndex > -1) {
|
||||
this.disconnectListeners.splice(listenerIndex, 1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the backoff timeout, and immediately start connecting if in backoff.
|
||||
*/
|
||||
|
||||
@ -1,442 +0,0 @@
|
||||
/*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
import * as assert from 'assert';
|
||||
import { OutgoingHttpHeaders } from 'http';
|
||||
import * as http2 from 'http2';
|
||||
import { range } from 'lodash';
|
||||
import * as stream from 'stream';
|
||||
|
||||
import { CallCredentials } from '../src/call-credentials';
|
||||
import { Http2CallStream } from '../src/call-stream';
|
||||
import { Channel, ChannelImplementation } from '../src/channel';
|
||||
import { CompressionFilterFactory } from '../src/compression-filter';
|
||||
import { Status } from '../src/constants';
|
||||
import { FilterStackFactory } from '../src/filter-stack';
|
||||
import { Metadata } from '../src/metadata';
|
||||
|
||||
import { assert2, mockFunction } from './common';
|
||||
import { EventEmitter } from 'events';
|
||||
|
||||
interface DataFrames {
|
||||
payload: Buffer;
|
||||
frameLengths: number[];
|
||||
}
|
||||
|
||||
const { HTTP2_HEADER_STATUS } = http2.constants;
|
||||
|
||||
function serialize(data: string): Buffer {
|
||||
const header: Buffer = Buffer.alloc(5);
|
||||
header.writeUInt8(0, 0); // TODO: Uncompressed only
|
||||
header.writeInt32BE(data.length, 1);
|
||||
return Buffer.concat([header, Buffer.from(data, 'utf8')]);
|
||||
}
|
||||
|
||||
class ClientHttp2StreamMock extends stream.Duplex
|
||||
implements http2.ClientHttp2Stream {
|
||||
constructor(private readonly dataFrames: DataFrames) {
|
||||
super();
|
||||
}
|
||||
emitResponse(responseCode: number, metadata?: Metadata) {
|
||||
this.emit('response', {
|
||||
[HTTP2_HEADER_STATUS]: responseCode,
|
||||
...(metadata ? metadata.toHttp2Headers() : {}),
|
||||
});
|
||||
}
|
||||
bytesRead = 0;
|
||||
dataFrame = 0;
|
||||
aborted = false;
|
||||
closed = false;
|
||||
destroyed = false;
|
||||
endAfterHeaders = false;
|
||||
pending = false;
|
||||
rstCode = 0;
|
||||
readonly bufferSize: number = 0;
|
||||
readonly sentHeaders: OutgoingHttpHeaders = {};
|
||||
readonly sentInfoHeaders?: OutgoingHttpHeaders[] = [];
|
||||
readonly sentTrailers?: OutgoingHttpHeaders = undefined;
|
||||
// tslint:disable:no-any
|
||||
session: http2.Http2Session = { socket: new EventEmitter() } as any;
|
||||
state: http2.StreamState = {} as any;
|
||||
// tslint:enable:no-any
|
||||
close = mockFunction;
|
||||
priority = mockFunction;
|
||||
rstStream = mockFunction;
|
||||
rstWithNoError = mockFunction;
|
||||
rstWithProtocolError = mockFunction;
|
||||
rstWithCancel = mockFunction;
|
||||
rstWithRefuse = mockFunction;
|
||||
rstWithInternalError = mockFunction;
|
||||
setTimeout = mockFunction;
|
||||
_read() {
|
||||
if (this.dataFrame === this.dataFrames.frameLengths.length) {
|
||||
if (this.bytesRead < this.dataFrames.payload.length) {
|
||||
this.push(
|
||||
this.dataFrames.payload.slice(
|
||||
this.bytesRead,
|
||||
this.dataFrames.payload.length
|
||||
)
|
||||
);
|
||||
}
|
||||
this.push(null);
|
||||
return;
|
||||
}
|
||||
const from = this.bytesRead;
|
||||
this.bytesRead += this.dataFrames.frameLengths[this.dataFrame++];
|
||||
this.push(this.dataFrames.payload.slice(from, this.bytesRead));
|
||||
}
|
||||
_write(chunk: Buffer, encoding: string, cb: Function) {
|
||||
this.emit('write', chunk);
|
||||
cb();
|
||||
}
|
||||
sendTrailers(headers: OutgoingHttpHeaders) {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
describe('CallStream', () => {
|
||||
const callStreamArgs = {
|
||||
deadline: Infinity,
|
||||
flags: 0,
|
||||
host: '',
|
||||
parentCall: null,
|
||||
};
|
||||
/* A CompressionFilter is now necessary to frame and deframe messages.
|
||||
* Currently the channel is unused, so we can replace it with an empty object,
|
||||
* but this might break if we start checking channel arguments, in which case
|
||||
* we will need a more sophisticated fake */
|
||||
const filterStackFactory = new FilterStackFactory([
|
||||
new CompressionFilterFactory({} as Channel),
|
||||
]);
|
||||
const message = 'eat this message'; // 16 bytes
|
||||
|
||||
beforeEach(() => {
|
||||
assert2.clearMustCalls();
|
||||
});
|
||||
|
||||
it('should emit a metadata event when it receives a response event', done => {
|
||||
const responseMetadata = new Metadata();
|
||||
responseMetadata.add('key', 'value');
|
||||
const callStream = new Http2CallStream(
|
||||
'foo',
|
||||
{} as ChannelImplementation,
|
||||
callStreamArgs,
|
||||
filterStackFactory,
|
||||
CallCredentials.createEmpty()
|
||||
);
|
||||
|
||||
const http2Stream = new ClientHttp2StreamMock({
|
||||
payload: Buffer.alloc(0),
|
||||
frameLengths: [],
|
||||
});
|
||||
callStream.once(
|
||||
'metadata',
|
||||
assert2.mustCall(metadata => {
|
||||
assert.deepStrictEqual(metadata.get('key'), ['value']);
|
||||
})
|
||||
);
|
||||
callStream.attachHttp2Stream(http2Stream);
|
||||
http2Stream.emitResponse(200, responseMetadata);
|
||||
assert2.afterMustCallsSatisfied(done);
|
||||
});
|
||||
|
||||
describe('should end a call with an error if a stream was closed', () => {
|
||||
const c = http2.constants;
|
||||
const s = Status;
|
||||
const errorCodeMapping = {
|
||||
[c.NGHTTP2_NO_ERROR]: s.INTERNAL,
|
||||
[c.NGHTTP2_PROTOCOL_ERROR]: s.INTERNAL,
|
||||
[c.NGHTTP2_INTERNAL_ERROR]: s.INTERNAL,
|
||||
[c.NGHTTP2_FLOW_CONTROL_ERROR]: s.INTERNAL,
|
||||
[c.NGHTTP2_SETTINGS_TIMEOUT]: s.INTERNAL,
|
||||
[c.NGHTTP2_STREAM_CLOSED]: null,
|
||||
[c.NGHTTP2_FRAME_SIZE_ERROR]: s.INTERNAL,
|
||||
[c.NGHTTP2_REFUSED_STREAM]: s.UNAVAILABLE,
|
||||
[c.NGHTTP2_CANCEL]: s.CANCELLED,
|
||||
[c.NGHTTP2_COMPRESSION_ERROR]: s.INTERNAL,
|
||||
[c.NGHTTP2_CONNECT_ERROR]: s.INTERNAL,
|
||||
[c.NGHTTP2_ENHANCE_YOUR_CALM]: s.RESOURCE_EXHAUSTED,
|
||||
[c.NGHTTP2_INADEQUATE_SECURITY]: s.PERMISSION_DENIED,
|
||||
};
|
||||
const keys = Object.keys(errorCodeMapping).map(key => Number(key));
|
||||
keys.forEach(key => {
|
||||
const value = errorCodeMapping[key];
|
||||
// A null value indicates: behavior isn't specified, so skip this test.
|
||||
const maybeSkip = (fn: typeof it) => (value ? fn : fn.skip);
|
||||
maybeSkip(it)(`for error code ${key}`, () => {
|
||||
return new Promise((resolve, reject) => {
|
||||
const callStream = new Http2CallStream(
|
||||
'foo',
|
||||
{} as ChannelImplementation,
|
||||
callStreamArgs,
|
||||
filterStackFactory,
|
||||
CallCredentials.createEmpty()
|
||||
);
|
||||
const http2Stream = new ClientHttp2StreamMock({
|
||||
payload: Buffer.alloc(0),
|
||||
frameLengths: [],
|
||||
});
|
||||
callStream.attachHttp2Stream(http2Stream);
|
||||
callStream.once('status', status => {
|
||||
try {
|
||||
assert.strictEqual(status.code, value);
|
||||
resolve();
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
}
|
||||
});
|
||||
http2Stream.rstCode = Number(key);
|
||||
http2Stream.emit('close');
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('should have functioning getters', done => {
|
||||
const callStream = new Http2CallStream(
|
||||
'foo',
|
||||
{} as ChannelImplementation,
|
||||
callStreamArgs,
|
||||
filterStackFactory,
|
||||
CallCredentials.createEmpty()
|
||||
);
|
||||
assert.strictEqual(callStream.getDeadline(), callStreamArgs.deadline);
|
||||
assert.strictEqual(callStream.getStatus(), null);
|
||||
const credentials = CallCredentials.createEmpty();
|
||||
callStream.setCredentials(credentials);
|
||||
assert.strictEqual(callStream.getCredentials(), credentials);
|
||||
callStream.on(
|
||||
'status',
|
||||
assert2.mustCall(status => {
|
||||
assert.strictEqual(status.code, Status.CANCELLED);
|
||||
assert.strictEqual(status.details, ';)');
|
||||
assert.strictEqual(callStream.getStatus(), status);
|
||||
})
|
||||
);
|
||||
callStream.cancelWithStatus(Status.CANCELLED, ';)');
|
||||
// TODO: getPeer
|
||||
assert2.afterMustCallsSatisfied(done);
|
||||
});
|
||||
|
||||
describe('attachHttp2Stream', () => {
|
||||
it('should handle an empty message', done => {
|
||||
const callStream = new Http2CallStream(
|
||||
'foo',
|
||||
{} as ChannelImplementation,
|
||||
callStreamArgs,
|
||||
filterStackFactory,
|
||||
CallCredentials.createEmpty()
|
||||
);
|
||||
const http2Stream = new ClientHttp2StreamMock({
|
||||
payload: serialize(''),
|
||||
frameLengths: [],
|
||||
});
|
||||
callStream.once(
|
||||
'data',
|
||||
assert2.mustCall(buffer => {
|
||||
assert.strictEqual(buffer.toString('utf8'), '');
|
||||
})
|
||||
);
|
||||
callStream.attachHttp2Stream(http2Stream);
|
||||
assert2.afterMustCallsSatisfied(done);
|
||||
});
|
||||
|
||||
[
|
||||
{
|
||||
description: 'all data is supplied in a single frame',
|
||||
frameLengths: [],
|
||||
},
|
||||
{
|
||||
description: 'frames are split along header field delimiters',
|
||||
frameLengths: [1, 4],
|
||||
},
|
||||
{
|
||||
description:
|
||||
'portions of header fields are split between different frames',
|
||||
frameLengths: [2, 1, 1, 4],
|
||||
},
|
||||
{
|
||||
description: 'frames are split into bytes',
|
||||
frameLengths: range(0, 20).map(() => 1),
|
||||
},
|
||||
].forEach((testCase: { description: string; frameLengths: number[] }) => {
|
||||
it(`should handle a short message where ${
|
||||
testCase.description
|
||||
}`, done => {
|
||||
const callStream = new Http2CallStream(
|
||||
'foo',
|
||||
{} as ChannelImplementation,
|
||||
callStreamArgs,
|
||||
filterStackFactory,
|
||||
CallCredentials.createEmpty()
|
||||
);
|
||||
const http2Stream = new ClientHttp2StreamMock({
|
||||
payload: serialize(message), // 21 bytes
|
||||
frameLengths: testCase.frameLengths,
|
||||
});
|
||||
callStream.once(
|
||||
'data',
|
||||
assert2.mustCall(buffer => {
|
||||
assert.strictEqual(buffer.toString('utf8'), message);
|
||||
})
|
||||
);
|
||||
callStream.once('end', assert2.mustCall(() => {}));
|
||||
callStream.attachHttp2Stream(http2Stream);
|
||||
assert2.afterMustCallsSatisfied(done);
|
||||
});
|
||||
});
|
||||
|
||||
[
|
||||
{
|
||||
description: 'all data is supplied in a single frame',
|
||||
frameLengths: [],
|
||||
},
|
||||
{
|
||||
description: 'frames are split between delimited messages',
|
||||
frameLengths: [21],
|
||||
},
|
||||
{
|
||||
description: 'frames are split within messages',
|
||||
frameLengths: [10, 22],
|
||||
},
|
||||
{
|
||||
description: "part of 2nd message's header is in first frame",
|
||||
frameLengths: [24],
|
||||
},
|
||||
{
|
||||
description: 'frames are split into bytes',
|
||||
frameLengths: range(0, 41).map(() => 1),
|
||||
},
|
||||
].forEach((testCase: { description: string; frameLengths: number[] }) => {
|
||||
it(`should handle two messages where ${testCase.description}`, done => {
|
||||
const callStream = new Http2CallStream(
|
||||
'foo',
|
||||
{} as ChannelImplementation,
|
||||
callStreamArgs,
|
||||
filterStackFactory,
|
||||
CallCredentials.createEmpty()
|
||||
);
|
||||
const http2Stream = new ClientHttp2StreamMock({
|
||||
payload: Buffer.concat([serialize(message), serialize(message)]), // 42 bytes
|
||||
frameLengths: testCase.frameLengths,
|
||||
});
|
||||
callStream.once(
|
||||
'data',
|
||||
assert2.mustCall(buffer => {
|
||||
assert.strictEqual(buffer.toString('utf8'), message);
|
||||
})
|
||||
);
|
||||
callStream.once(
|
||||
'data',
|
||||
assert2.mustCall(buffer => {
|
||||
assert.strictEqual(buffer.toString('utf8'), message);
|
||||
})
|
||||
);
|
||||
callStream.once('end', assert2.mustCall(() => {}));
|
||||
callStream.attachHttp2Stream(http2Stream);
|
||||
assert2.afterMustCallsSatisfied(done);
|
||||
});
|
||||
});
|
||||
|
||||
it('should send buffered writes', done => {
|
||||
const callStream = new Http2CallStream(
|
||||
'foo',
|
||||
{} as ChannelImplementation,
|
||||
callStreamArgs,
|
||||
filterStackFactory,
|
||||
CallCredentials.createEmpty()
|
||||
);
|
||||
const http2Stream = new ClientHttp2StreamMock({
|
||||
payload: Buffer.alloc(0),
|
||||
frameLengths: [],
|
||||
});
|
||||
let streamFlushed = false;
|
||||
http2Stream.once(
|
||||
'write',
|
||||
assert2.mustCall((chunk: Buffer) => {
|
||||
const dataLength = chunk.readInt32BE(1);
|
||||
const encodedMessage = chunk.slice(5).toString('utf8');
|
||||
assert.strictEqual(dataLength, message.length);
|
||||
assert.strictEqual(encodedMessage, message);
|
||||
streamFlushed = true;
|
||||
})
|
||||
);
|
||||
callStream.write(
|
||||
{ message: Buffer.from(message) },
|
||||
assert2.mustCall(() => {
|
||||
// Ensure this is called only after contents are written to http2Stream
|
||||
assert.ok(streamFlushed);
|
||||
})
|
||||
);
|
||||
callStream.end(assert2.mustCall(() => {}));
|
||||
callStream.attachHttp2Stream(http2Stream);
|
||||
assert2.afterMustCallsSatisfied(done);
|
||||
});
|
||||
|
||||
it('should cause data chunks in write calls afterward to be written to the given stream', done => {
|
||||
const callStream = new Http2CallStream(
|
||||
'foo',
|
||||
{} as ChannelImplementation,
|
||||
callStreamArgs,
|
||||
filterStackFactory,
|
||||
CallCredentials.createEmpty()
|
||||
);
|
||||
const http2Stream = new ClientHttp2StreamMock({
|
||||
payload: Buffer.alloc(0),
|
||||
frameLengths: [],
|
||||
});
|
||||
http2Stream.once(
|
||||
'write',
|
||||
assert2.mustCall((chunk: Buffer) => {
|
||||
const dataLength = chunk.readInt32BE(1);
|
||||
const encodedMessage = chunk.slice(5).toString('utf8');
|
||||
assert.strictEqual(dataLength, message.length);
|
||||
assert.strictEqual(encodedMessage, message);
|
||||
})
|
||||
);
|
||||
callStream.attachHttp2Stream(http2Stream);
|
||||
callStream.write(
|
||||
{ message: Buffer.from(message) },
|
||||
assert2.mustCall(() => {})
|
||||
);
|
||||
callStream.end(assert2.mustCall(() => {}));
|
||||
assert2.afterMustCallsSatisfied(done);
|
||||
});
|
||||
|
||||
it('should handle underlying stream errors', () => {
|
||||
const callStream = new Http2CallStream(
|
||||
'foo',
|
||||
{} as ChannelImplementation,
|
||||
callStreamArgs,
|
||||
filterStackFactory,
|
||||
CallCredentials.createEmpty()
|
||||
);
|
||||
const http2Stream = new ClientHttp2StreamMock({
|
||||
payload: Buffer.alloc(0),
|
||||
frameLengths: [],
|
||||
});
|
||||
callStream.once(
|
||||
'status',
|
||||
assert2.mustCall(status => {
|
||||
assert.strictEqual(status.code, Status.INTERNAL);
|
||||
})
|
||||
);
|
||||
callStream.attachHttp2Stream(http2Stream);
|
||||
http2Stream.emit('error');
|
||||
});
|
||||
});
|
||||
});
|
||||
Loading…
x
Reference in New Issue
Block a user