Merge pull request #2409 from murgatroid99/v1.8.x_merge

Merge v1.8.x into master
This commit is contained in:
Michael Lumish 2023-04-05 16:50:12 -07:00 committed by GitHub
commit 90de58ce37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1081 additions and 722 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js-xds",
"version": "1.8.0",
"version": "1.8.1",
"description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.",
"main": "build/src/index.js",
"scripts": {

View File

@ -342,11 +342,13 @@ export class XdsClient {
this.adsNode = {
...bootstrapInfo.node,
user_agent_name: userAgentName,
user_agent_version: clientVersion,
client_features: ['envoy.lb.does_not_support_overprovisioning'],
};
this.lrsNode = {
...bootstrapInfo.node,
user_agent_name: userAgentName,
user_agent_version: clientVersion,
client_features: ['envoy.lrs.supports_send_all_clusters'],
};
setCsdsClientNode(this.adsNode);

View File

@ -32,6 +32,8 @@ const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [
'suffix_match'];
const SUPPORTED_CLUSTER_SPECIFIERS = ['cluster', 'weighted_clusters', 'cluster_header'];
const UINT32_MAX = 0xFFFFFFFF;
function durationToMs(duration: Duration__Output | null): number | null {
if (duration === null) {
return null;
@ -130,14 +132,11 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
}
}
if (route.route!.cluster_specifier === 'weighted_clusters') {
if (route.route.weighted_clusters!.total_weight?.value === 0) {
return false;
}
let weightSum = 0;
for (const clusterWeight of route.route.weighted_clusters!.clusters) {
weightSum += clusterWeight.weight?.value ?? 0;
}
if (weightSum !== route.route.weighted_clusters!.total_weight?.value ?? 100) {
if (weightSum === 0 || weightSum > UINT32_MAX) {
return false;
}
if (EXPERIMENTAL_FAULT_INJECTION) {

View File

@ -62,6 +62,7 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`.
- `grpc.enable_retries`
- `grpc.per_rpc_retry_buffer_size`
- `grpc.retry_buffer_size`
- `grpc.service_config_disable_resolution`
- `grpc-node.max_session_memory`
- `channelOverride`
- `channelFactoryOverride`

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.8.0",
"version": "1.8.13",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -55,6 +55,7 @@ export interface ChannelOptions {
'grpc.max_connection_age_ms'?: number;
'grpc.max_connection_age_grace_ms'?: number;
'grpc-node.max_session_memory'?: number;
'grpc.service_config_disable_resolution'?: number;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
}
@ -87,6 +88,7 @@ export const recognizedOptions = {
'grpc.max_connection_age_ms': true,
'grpc.max_connection_age_grace_ms': true,
'grpc-node.max_session_memory': true,
'grpc.service_config_disable_resolution': true,
};
export function channelOptionsEqual(

View File

@ -51,8 +51,45 @@ export function getDeadlineTimeoutString(deadline: Deadline) {
throw new Error('Deadline is too far in the future')
}
/**
* See https://nodejs.org/api/timers.html#settimeoutcallback-delay-args
* In particular, "When delay is larger than 2147483647 or less than 1, the
* delay will be set to 1. Non-integer delays are truncated to an integer."
* This number of milliseconds is almost 25 days.
*/
const MAX_TIMEOUT_TIME = 2147483647;
/**
* Get the timeout value that should be passed to setTimeout now for the timer
* to end at the deadline. For any deadline before now, the timer should end
* immediately, represented by a value of 0. For any deadline more than
* MAX_TIMEOUT_TIME milliseconds in the future, a timer cannot be set that will
* end at that time, so it is treated as infinitely far in the future.
* @param deadline
* @returns
*/
export function getRelativeTimeout(deadline: Deadline) {
const deadlineMs = deadline instanceof Date ? deadline.getTime() : deadline;
const now = new Date().getTime();
return deadlineMs - now;
const timeout = deadlineMs - now;
if (timeout < 0) {
return 0;
} else if (timeout > MAX_TIMEOUT_TIME) {
return Infinity
} else {
return timeout;
}
}
export function deadlineToString(deadline: Deadline): string {
if (deadline instanceof Date) {
return deadline.toISOString();
} else {
const dateDeadline = new Date(deadline);
if (Number.isNaN(dateDeadline.getTime())) {
return '' + deadline;
} else {
return dateDeadline.toISOString();
}
}
}

View File

@ -237,7 +237,7 @@ export const getClientChannel = (client: Client) => {
export { StatusBuilder };
export { Listener } from './call-interface';
export { Listener, InterceptingListener } from './call-interface';
export {
Requester,
@ -248,6 +248,7 @@ export {
InterceptorProvider,
InterceptingCall,
InterceptorConfigurationError,
NextCall
} from './client-interceptors';
export {

View File

@ -46,11 +46,12 @@ import { LoadBalancingCall } from './load-balancing-call';
import { CallCredentials } from './call-credentials';
import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from './call-interface';
import { SubchannelCall } from './subchannel-call';
import { Deadline, getDeadlineTimeoutString } from './deadline';
import { Deadline, deadlineToString, getDeadlineTimeoutString } from './deadline';
import { ResolvingCall } from './resolving-call';
import { getNextCallNumber } from './call-number';
import { restrictControlPlaneStatusCode } from './control-plane-status';
import { MessageBufferTracker, RetryingCall, RetryThrottler } from './retrying-call';
import { BaseSubchannelWrapper, ConnectivityStateListener, SubchannelInterface } from './subchannel-interface';
/**
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
@ -84,6 +85,32 @@ const RETRY_THROTTLER_MAP: Map<string, RetryThrottler> = new Map();
const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1<<24; // 16 MB
const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1<<20; // 1 MB
class ChannelSubchannelWrapper extends BaseSubchannelWrapper implements SubchannelInterface {
private refCount = 0;
private subchannelStateListener: ConnectivityStateListener;
constructor(childSubchannel: SubchannelInterface, private channel: InternalChannel) {
super(childSubchannel);
this.subchannelStateListener = (subchannel, previousState, newState, keepaliveTime) => {
channel.throttleKeepalive(keepaliveTime);
};
childSubchannel.addConnectivityStateListener(this.subchannelStateListener);
}
ref(): void {
this.child.ref();
this.refCount += 1;
}
unref(): void {
this.child.unref();
this.refCount -= 1;
if (this.refCount <= 0) {
this.child.removeConnectivityStateListener(this.subchannelStateListener);
this.channel.removeWrappedSubchannel(this);
}
}
}
export class InternalChannel {
private resolvingLoadBalancer: ResolvingLoadBalancer;
@ -116,8 +143,10 @@ export class InternalChannel {
* configSelector becomes set or the channel state becomes anything other
* than TRANSIENT_FAILURE.
*/
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;
private keepaliveTime: number;
private wrappedSubchannels: Set<ChannelSubchannelWrapper> = new Set();
// Channelz info
private readonly channelzEnabled: boolean = true;
@ -190,6 +219,7 @@ export class InternalChannel {
options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES,
options['grpc.per_rpc_retry_buffer_size'] ?? DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
);
this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
const channelControlHelper: ChannelControlHelper = {
createSubchannel: (
subchannelAddress: SubchannelAddress,
@ -201,10 +231,13 @@ export class InternalChannel {
Object.assign({}, this.options, subchannelArgs),
this.credentials
);
subchannel.throttleKeepalive(this.keepaliveTime);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
}
return subchannel;
const wrappedSubchannel = new ChannelSubchannelWrapper(subchannel, this);
this.wrappedSubchannels.add(wrappedSubchannel);
return wrappedSubchannel;
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.currentPicker = picker;
@ -369,6 +402,19 @@ export class InternalChannel {
}
}
throttleKeepalive(newKeepaliveTime: number) {
if (newKeepaliveTime > this.keepaliveTime) {
this.keepaliveTime = newKeepaliveTime;
for (const wrappedSubchannel of this.wrappedSubchannels) {
wrappedSubchannel.throttleKeepalive(newKeepaliveTime);
}
}
}
removeWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) {
this.wrappedSubchannels.delete(wrappedSubchannel);
}
doPick(metadata: Metadata, extraPickInfo: {[key: string]: string}) {
return this.currentPicker.pick({metadata: metadata, extraPickInfo: extraPickInfo});
}
@ -469,7 +515,7 @@ export class InternalChannel {
'] method="' +
method +
'", deadline=' +
deadline
deadlineToString(deadline)
);
const finalOptions: CallStreamOptions = {
deadline: deadline,

View File

@ -205,11 +205,11 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements
constructor(childSubchannel: SubchannelInterface, private mapEntry?: MapEntry) {
super(childSubchannel);
this.childSubchannelState = childSubchannel.getConnectivityState();
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState) => {
childSubchannel.addConnectivityStateListener((subchannel, previousState, newState, keepaliveTime) => {
this.childSubchannelState = newState;
if (!this.ejected) {
for (const listener of this.stateListeners) {
listener(this, previousState, newState);
listener(this, previousState, newState, keepaliveTime);
}
}
});
@ -265,14 +265,14 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements
eject() {
this.ejected = true;
for (const listener of this.stateListeners) {
listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE);
listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE, -1);
}
}
uneject() {
this.ejected = false;
for (const listener of this.stateListeners) {
listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState);
listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState, -1);
}
}

View File

@ -33,6 +33,7 @@ import {
} from './picker';
import {
SubchannelAddress,
subchannelAddressEqual,
subchannelAddressToString,
} from './subchannel-address';
import * as logging from './logging';
@ -168,7 +169,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
* connecting to the next one instead of waiting for the connection
* delay timer. */
if (
subchannel === this.subchannels[this.currentSubchannelIndex] &&
subchannel.getRealSubchannel() === this.subchannels[this.currentSubchannelIndex].getRealSubchannel() &&
newState === ConnectivityState.TRANSIENT_FAILURE
) {
this.startNextSubchannelConnecting();
@ -419,8 +420,9 @@ export class PickFirstLoadBalancer implements LoadBalancer {
* address list is different from the existing one */
if (
this.subchannels.length === 0 ||
this.latestAddressList.length !== addressList.length ||
!this.latestAddressList.every(
(value, index) => addressList[index] === value
(value, index) => addressList[index] && subchannelAddressEqual(addressList[index], value)
)
) {
this.latestAddressList = addressList;

View File

@ -102,6 +102,7 @@ export class LoadBalancingCall implements Call {
if (!this.metadata) {
throw new Error('doPick called before start');
}
this.trace('Pick called')
const pickResult = this.channel.doPick(this.metadata, this.callConfig.pickInformation);
const subchannelString = pickResult.subchannel ?
'(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() :

View File

@ -98,6 +98,7 @@ class DnsResolver implements Resolver {
private continueResolving = false;
private nextResolutionTimer: NodeJS.Timer;
private isNextResolutionTimerRunning = false;
private isServiceConfigEnabled = true;
constructor(
private target: GrpcUri,
private listener: ResolverListener,
@ -127,6 +128,10 @@ class DnsResolver implements Resolver {
}
this.percentage = Math.random() * 100;
if (channelOptions['grpc.service_config_disable_resolution'] === 1) {
this.isServiceConfigEnabled = false;
}
this.defaultResolutionError = {
code: Status.UNAVAILABLE,
details: `Name resolution failed for target ${uriToString(this.target)}`,
@ -255,7 +260,7 @@ class DnsResolver implements Resolver {
);
/* If there already is a still-pending TXT resolution, we can just use
* that result when it comes in */
if (this.pendingTxtPromise === null) {
if (this.isServiceConfigEnabled && this.pendingTxtPromise === null) {
/* We handle the TXT query promise differently than the others because
* the name resolution attempt as a whole is a success even if the TXT
* lookup fails */

View File

@ -18,7 +18,7 @@
import { CallCredentials } from "./call-credentials";
import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from "./call-interface";
import { LogVerbosity, Propagate, Status } from "./constants";
import { Deadline, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline";
import { Deadline, deadlineToString, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline";
import { FilterStack, FilterStackFactory } from "./filter-stack";
import { InternalChannel } from "./internal-channel";
import { Metadata } from "./metadata";
@ -79,9 +79,9 @@ export class ResolvingCall implements Call {
private runDeadlineTimer() {
clearTimeout(this.deadlineTimer);
this.trace('Deadline: ' + this.deadline);
if (this.deadline !== Infinity) {
const timeout = getRelativeTimeout(this.deadline);
this.trace('Deadline: ' + deadlineToString(this.deadline));
const timeout = getRelativeTimeout(this.deadline);
if (timeout !== Infinity) {
this.trace('Deadline will be reached in ' + timeout + 'ms');
const handleDeadline = () => {
this.cancelWithStatus(
@ -103,6 +103,7 @@ export class ResolvingCall implements Call {
if (!this.filterStack) {
this.filterStack = this.filterStackFactory.createFilter();
}
clearTimeout(this.deadlineTimer);
const filteredStatus = this.filterStack.receiveTrailers(status);
this.trace('ended with status: code=' + filteredStatus.code + ' details="' + filteredStatus.details + '"');
this.statusWatchers.forEach(watcher => watcher(filteredStatus));
@ -177,13 +178,17 @@ export class ResolvingCall implements Call {
this.filterStack = this.filterStackFactory.createFilter();
this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(filteredMetadata => {
this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline);
this.trace('Created child [' + this.child.getCallNumber() + ']')
this.child.start(filteredMetadata, {
onReceiveMetadata: metadata => {
this.trace('Received metadata')
this.listener!.onReceiveMetadata(this.filterStack!.receiveMetadata(metadata));
},
onReceiveMessage: message => {
this.trace('Received message');
this.readFilterPending = true;
this.filterStack!.receiveMessage(message).then(filteredMesssage => {
this.trace('Finished filtering received message');
this.readFilterPending = false;
this.listener!.onReceiveMessage(filteredMesssage);
if (this.pendingChildStatus) {
@ -194,6 +199,7 @@ export class ResolvingCall implements Call {
});
},
onReceiveStatus: status => {
this.trace('Received status');
if (this.readFilterPending) {
this.pendingChildStatus = status;
} else {

View File

@ -151,6 +151,12 @@ export class RetryingCall implements Call {
private initialMetadata: Metadata | null = null;
private underlyingCalls: UnderlyingCall[] = [];
private writeBuffer: WriteBufferEntry[] = [];
/**
* The offset of message indices in the writeBuffer. For example, if
* writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15
* is in writeBuffer[5].
*/
private writeBufferOffset = 0;
/**
* Tracks whether a read has been started, so that we know whether to start
* reads on new child calls. This only matters for the first read, because
@ -202,8 +208,16 @@ export class RetryingCall implements Call {
private reportStatus(statusObject: StatusObject) {
this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"');
this.bufferTracker.freeAll(this.callNumber);
this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length;
this.writeBuffer = [];
process.nextTick(() => {
this.listener?.onReceiveStatus(statusObject);
// Explicitly construct status object to remove progress field
this.listener?.onReceiveStatus({
code: statusObject.code,
details: statusObject.details,
metadata: statusObject.metadata
});
});
}
@ -222,20 +236,27 @@ export class RetryingCall implements Call {
}
}
private maybefreeMessageBufferEntry(messageIndex: number) {
private getBufferEntry(messageIndex: number): WriteBufferEntry {
return this.writeBuffer[messageIndex - this.writeBufferOffset] ?? {entryType: 'FREED', allocated: false};
}
private getNextBufferIndex() {
return this.writeBufferOffset + this.writeBuffer.length;
}
private clearSentMessages() {
if (this.state !== 'COMMITTED') {
return;
}
const bufferEntry = this.writeBuffer[messageIndex];
if (bufferEntry.entryType === 'MESSAGE') {
const earliestNeededMessageIndex = this.underlyingCalls[this.committedCallIndex!].nextMessageToSend;
for (let messageIndex = this.writeBufferOffset; messageIndex < earliestNeededMessageIndex; messageIndex++) {
const bufferEntry = this.getBufferEntry(messageIndex);
if (bufferEntry.allocated) {
this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber);
}
this.writeBuffer[messageIndex] = {
entryType: 'FREED',
allocated: false
};
}
this.writeBuffer = this.writeBuffer.slice(earliestNeededMessageIndex - this.writeBufferOffset);
this.writeBufferOffset = earliestNeededMessageIndex;
}
private commitCall(index: number) {
@ -258,21 +279,28 @@ export class RetryingCall implements Call {
this.underlyingCalls[i].state = 'COMPLETED';
this.underlyingCalls[i].call.cancelWithStatus(Status.CANCELLED, 'Discarded in favor of other hedged attempt');
}
for (let messageIndex = 0; messageIndex < this.underlyingCalls[index].nextMessageToSend - 1; messageIndex += 1) {
this.maybefreeMessageBufferEntry(messageIndex);
}
this.clearSentMessages();
}
private commitCallWithMostMessages() {
if (this.state === 'COMMITTED') {
return;
}
let mostMessages = -1;
let callWithMostMessages = -1;
for (const [index, childCall] of this.underlyingCalls.entries()) {
if (childCall.nextMessageToSend > mostMessages) {
if (childCall.state === 'ACTIVE' && childCall.nextMessageToSend > mostMessages) {
mostMessages = childCall.nextMessageToSend;
callWithMostMessages = index;
}
}
this.commitCall(callWithMostMessages);
if (callWithMostMessages === -1) {
/* There are no active calls, disable retries to force the next call that
* is started to be committed. */
this.state = 'TRANSPARENT_ONLY';
} else {
this.commitCall(callWithMostMessages);
}
}
private isStatusCodeInList(list: (Status | string)[], code: Status) {
@ -532,8 +560,8 @@ export class RetryingCall implements Call {
private handleChildWriteCompleted(childIndex: number) {
const childCall = this.underlyingCalls[childIndex];
const messageIndex = childCall.nextMessageToSend;
this.writeBuffer[messageIndex].callback?.();
this.maybefreeMessageBufferEntry(messageIndex);
this.getBufferEntry(messageIndex).callback?.();
this.clearSentMessages();
childCall.nextMessageToSend += 1;
this.sendNextChildMessage(childIndex);
}
@ -543,10 +571,10 @@ export class RetryingCall implements Call {
if (childCall.state === 'COMPLETED') {
return;
}
if (this.writeBuffer[childCall.nextMessageToSend]) {
const bufferEntry = this.writeBuffer[childCall.nextMessageToSend];
if (this.getBufferEntry(childCall.nextMessageToSend)) {
const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
switch (bufferEntry.entryType) {
case 'MESSAGE':
case 'MESSAGE':
childCall.call.sendMessageWithContext({
callback: (error) => {
// Ignore error
@ -571,13 +599,13 @@ export class RetryingCall implements Call {
message,
flags: context.flags,
};
const messageIndex = this.writeBuffer.length;
const messageIndex = this.getNextBufferIndex();
const bufferEntry: WriteBufferEntry = {
entryType: 'MESSAGE',
message: writeObj,
allocated: this.bufferTracker.allocate(message.length, this.callNumber)
};
this.writeBuffer[messageIndex] = bufferEntry;
this.writeBuffer.push(bufferEntry);
if (bufferEntry.allocated) {
context.callback?.();
for (const [callIndex, call] of this.underlyingCalls.entries()) {
@ -592,7 +620,11 @@ export class RetryingCall implements Call {
}
} else {
this.commitCallWithMostMessages();
const call = this.underlyingCalls[this.committedCallIndex!];
// commitCallWithMostMessages can fail if we are between ping attempts
if (this.committedCallIndex === null) {
return;
}
const call = this.underlyingCalls[this.committedCallIndex];
bufferEntry.callback = context.callback;
if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
call.call.sendMessageWithContext({
@ -615,11 +647,11 @@ export class RetryingCall implements Call {
}
halfClose(): void {
this.trace('halfClose called');
const halfCloseIndex = this.writeBuffer.length;
this.writeBuffer[halfCloseIndex] = {
const halfCloseIndex = this.getNextBufferIndex();
this.writeBuffer.push({
entryType: 'HALF_CLOSE',
allocated: false
};
});
for (const call of this.underlyingCalls) {
if (call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) {
call.nextMessageToSend += 1;

View File

@ -63,11 +63,13 @@ const deadlineUnitsToMs: DeadlineUnitIndexSignature = {
u: 0.001,
n: 0.000001,
};
const defaultResponseHeaders = {
const defaultCompressionHeaders = {
// TODO(cjihrig): Remove these encoding headers from the default response
// once compression is integrated.
[GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
[GRPC_ENCODING_HEADER]: 'identity',
}
const defaultResponseHeaders = {
[http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
};
@ -500,7 +502,7 @@ export class Http2ServerCallStream<
this.metadataSent = true;
const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
// TODO(cjihrig): Include compression headers.
const headers = { ...defaultResponseHeaders, ...custom };
const headers = { ...defaultResponseHeaders, ...defaultCompressionHeaders, ...custom };
this.stream.respond(headers, defaultResponseOptions);
}
@ -725,9 +727,11 @@ export class Http2ServerCallStream<
this.stream.end();
}
} else {
// Trailers-only response
const trailersToSend = {
[GRPC_STATUS_HEADER]: statusObj.code,
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details),
...defaultResponseHeaders,
...statusObj.metadata?.toHttp2Headers(),
};
this.stream.respond(trailersToSend, {endStream: true});

View File

@ -41,9 +41,15 @@ export function isTcpSubchannelAddress(
}
export function subchannelAddressEqual(
address1: SubchannelAddress,
address2: SubchannelAddress
address1?: SubchannelAddress,
address2?: SubchannelAddress
): boolean {
if (!address1 && !address2) {
return true;
}
if (!address1 || !address2) {
return false;
}
if (isTcpSubchannelAddress(address1)) {
return (
isTcpSubchannelAddress(address2) &&

View File

@ -21,12 +21,12 @@ import * as os from 'os';
import { Status } from './constants';
import { Metadata } from './metadata';
import { StreamDecoder } from './stream-decoder';
import { SubchannelCallStatsTracker, Subchannel } from './subchannel';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import { ServerSurfaceCall } from './server-call';
import { Deadline } from './deadline';
import { InterceptingListener, MessageContext, StatusObject, WriteCallback } from './call-interface';
import { CallEventTracker, Transport } from './transport';
const TRACER_NAME = 'subchannel_call';
@ -87,6 +87,7 @@ export class Http2SubchannelCall implements SubchannelCall {
private decoder = new StreamDecoder();
private isReadFilterPending = false;
private isPushPending = false;
private canPush = false;
/**
* Indicates that an 'end' event has come from the http2 stream, so there
@ -104,26 +105,15 @@ export class Http2SubchannelCall implements SubchannelCall {
// This is populated (non-null) if and only if the call has ended
private finalStatus: StatusObject | null = null;
private disconnectListener: () => void;
private internalError: SystemError | null = null;
constructor(
private readonly http2Stream: http2.ClientHttp2Stream,
private readonly callStatsTracker: SubchannelCallStatsTracker,
private readonly callEventTracker: CallEventTracker,
private readonly listener: SubchannelCallInterceptingListener,
private readonly subchannel: Subchannel,
private readonly transport: Transport,
private readonly callId: number
) {
this.disconnectListener = () => {
this.endCall({
code: Status.UNAVAILABLE,
details: 'Connection dropped',
metadata: new Metadata(),
});
};
subchannel.addDisconnectListener(this.disconnectListener);
subchannel.callRef();
http2Stream.on('response', (headers, flags) => {
let headersString = '';
for (const header of Object.keys(headers)) {
@ -185,7 +175,7 @@ export class Http2SubchannelCall implements SubchannelCall {
for (const message of messages) {
this.trace('parsed message of length ' + message.length);
this.callStatsTracker!.addMessageReceived();
this.callEventTracker!.addMessageReceived();
this.tryPush(message);
}
});
@ -289,7 +279,15 @@ export class Http2SubchannelCall implements SubchannelCall {
);
this.internalError = err;
}
this.callStatsTracker.onStreamEnd(false);
this.callEventTracker.onStreamEnd(false);
});
}
public onDisconnect() {
this.endCall({
code: Status.UNAVAILABLE,
details: 'Connection dropped',
metadata: new Metadata(),
});
}
@ -304,7 +302,7 @@ export class Http2SubchannelCall implements SubchannelCall {
this.finalStatus!.details +
'"'
);
this.callStatsTracker.onCallEnd(this.finalStatus!);
this.callEventTracker.onCallEnd(this.finalStatus!);
/* We delay the actual action of bubbling up the status to insulate the
* cleanup code in this class from any errors that may be thrown in the
* upper layers as a result of bubbling up the status. In particular,
@ -319,8 +317,6 @@ export class Http2SubchannelCall implements SubchannelCall {
* not push more messages after the status is output, so the messages go
* nowhere either way. */
this.http2Stream.resume();
this.subchannel.callUnref();
this.subchannel.removeDisconnectListener(this.disconnectListener);
}
}
@ -356,7 +352,8 @@ export class Http2SubchannelCall implements SubchannelCall {
this.finalStatus.code !== Status.OK ||
(this.readsClosed &&
this.unpushedReadMessages.length === 0 &&
!this.isReadFilterPending)
!this.isReadFilterPending &&
!this.isPushPending)
) {
this.outputStatus();
}
@ -369,7 +366,9 @@ export class Http2SubchannelCall implements SubchannelCall {
(message instanceof Buffer ? message.length : null)
);
this.canPush = false;
this.isPushPending = true;
process.nextTick(() => {
this.isPushPending = false;
/* If we have already output the status any later messages should be
* ignored, and can cause out-of-order operation errors higher up in the
* stack. Checking as late as possible here to avoid any race conditions.
@ -395,7 +394,7 @@ export class Http2SubchannelCall implements SubchannelCall {
}
private handleTrailers(headers: http2.IncomingHttpHeaders) {
this.callStatsTracker.onStreamEnd(true);
this.callEventTracker.onStreamEnd(true);
let headersString = '';
for (const header of Object.keys(headers)) {
headersString += '\t\t' + header + ': ' + headers[header] + '\n';
@ -467,7 +466,7 @@ export class Http2SubchannelCall implements SubchannelCall {
}
getPeer(): string {
return this.subchannel.getAddress();
return this.transport.getPeerName();
}
getCallNumber(): number {
@ -506,7 +505,7 @@ export class Http2SubchannelCall implements SubchannelCall {
context.callback?.();
};
this.trace('sending data chunk of length ' + message.length);
this.callStatsTracker.addMessageSent();
this.callEventTracker.addMessageSent();
try {
this.http2Stream!.write(message, cb);
} catch (error) {

View File

@ -22,7 +22,8 @@ import { Subchannel } from "./subchannel";
export type ConnectivityStateListener = (
subchannel: SubchannelInterface,
previousState: ConnectivityState,
newState: ConnectivityState
newState: ConnectivityState,
keepaliveTime: number
) => void;
/**
@ -40,6 +41,7 @@ export interface SubchannelInterface {
removeConnectivityStateListener(listener: ConnectivityStateListener): void;
startConnecting(): void;
getAddress(): string;
throttleKeepalive(newKeepaliveTime: number): void;
ref(): void;
unref(): void;
getChannelzRef(): SubchannelRef;
@ -67,6 +69,9 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface {
getAddress(): string {
return this.child.getAddress();
}
throttleKeepalive(newKeepaliveTime: number): void {
this.child.throttleKeepalive(newKeepaliveTime);
}
ref(): void {
this.child.ref();
}

View File

@ -23,6 +23,7 @@ import {
} from './subchannel-address';
import { ChannelCredentials } from './channel-credentials';
import { GrpcUri, uriToString } from './uri-parser';
import { Http2SubchannelConnector } from './transport';
// 10 seconds in milliseconds. This value is arbitrary.
/**
@ -143,7 +144,8 @@ export class SubchannelPool {
channelTargetUri,
subchannelTarget,
channelArguments,
channelCredentials
channelCredentials,
new Http2SubchannelConnector(channelTargetUri)
);
if (!(channelTarget in this.pool)) {
this.pool[channelTarget] = [];

View File

@ -15,60 +15,30 @@
*
*/
import * as http2 from 'http2';
import { ChannelCredentials } from './channel-credentials';
import { Metadata } from './metadata';
import { ChannelOptions } from './channel-options';
import { PeerCertificate, checkServerIdentity, TLSSocket, CipherNameAndProtocol } from 'tls';
import { ConnectivityState } from './connectivity-state';
import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
import { getDefaultAuthority } from './resolver';
import * as logging from './logging';
import { LogVerbosity, Status } from './constants';
import { getProxiedConnection, ProxyConnectionResult } from './http_proxy';
import * as net from 'net';
import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser';
import { ConnectionOptions } from 'tls';
import { GrpcUri, uriToString } from './uri-parser';
import {
stringToSubchannelAddress,
SubchannelAddress,
subchannelAddressToString,
} from './subchannel-address';
import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz';
import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, unregisterChannelzRef } from './channelz';
import { ConnectivityStateListener } from './subchannel-interface';
import { Http2SubchannelCall, SubchannelCallInterceptingListener } from './subchannel-call';
import { getNextCallNumber } from './call-number';
import { SubchannelCallInterceptingListener } from './subchannel-call';
import { SubchannelCall } from './subchannel-call';
import { InterceptingListener, StatusObject } from './call-interface';
const clientVersion = require('../../package.json').version;
import { CallEventTracker, SubchannelConnector, Transport } from './transport';
const TRACER_NAME = 'subchannel';
const FLOW_CONTROL_TRACER_NAME = 'subchannel_flowctrl';
/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
* have a constant for the max signed 32 bit integer, so this is a simple way
* to calculate it */
const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
const KEEPALIVE_TIMEOUT_MS = 20000;
export interface SubchannelCallStatsTracker {
addMessageSent(): void;
addMessageReceived(): void;
onCallEnd(status: StatusObject): void;
onStreamEnd(success: boolean): void;
}
const {
HTTP2_HEADER_AUTHORITY,
HTTP2_HEADER_CONTENT_TYPE,
HTTP2_HEADER_METHOD,
HTTP2_HEADER_PATH,
HTTP2_HEADER_TE,
HTTP2_HEADER_USER_AGENT,
} = http2.constants;
const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii');
export class Subchannel {
/**
@ -79,7 +49,7 @@ export class Subchannel {
/**
* The underlying http2 session used to make requests.
*/
private session: http2.ClientHttp2Session | null = null;
private transport: Transport | null = null;
/**
* Indicates that the subchannel should transition from TRANSIENT_FAILURE to
* CONNECTING instead of IDLE when the backoff timeout ends.
@ -92,45 +62,9 @@ export class Subchannel {
*/
private stateListeners: ConnectivityStateListener[] = [];
/**
* A list of listener functions that will be called when the underlying
* socket disconnects. Used for ending active calls with an UNAVAILABLE
* status.
*/
private disconnectListeners: Set<() => void> = new Set();
private backoffTimeout: BackoffTimeout;
/**
* The complete user agent string constructed using channel args.
*/
private userAgent: string;
/**
* The amount of time in between sending pings
*/
private keepaliveTimeMs: number = KEEPALIVE_MAX_TIME_MS;
/**
* The amount of time to wait for an acknowledgement after sending a ping
*/
private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS;
/**
* Timer reference for timeout that indicates when to send the next ping
*/
private keepaliveIntervalId: NodeJS.Timer;
/**
* Timer reference tracking when the most recent ping will be considered lost
*/
private keepaliveTimeoutId: NodeJS.Timer;
/**
* Indicates whether keepalive pings should be sent without any active calls
*/
private keepaliveWithoutCalls = false;
/**
* Tracks calls with references to this subchannel
*/
private callRefcount = 0;
private keepaliveTime: number;
/**
* Tracks channels and subchannel pools with references to this subchannel
*/
@ -149,18 +83,7 @@ export class Subchannel {
private childrenTracker = new ChannelzChildrenTracker();
// Channelz socket info
private channelzSocketRef: SocketRef | null = null;
/**
* Name of the remote server, if it is not the same as the subchannel
* address, i.e. if connecting through an HTTP CONNECT proxy.
*/
private remoteName: string | null = null;
private streamTracker = new ChannelzCallTracker();
private keepalivesSent = 0;
private messagesSent = 0;
private messagesReceived = 0;
private lastMessageSentTimestamp: Date | null = null;
private lastMessageReceivedTimestamp: Date | null = null;
/**
* A class representing a connection to a single backend.
@ -176,33 +99,9 @@ export class Subchannel {
private channelTarget: GrpcUri,
private subchannelAddress: SubchannelAddress,
private options: ChannelOptions,
private credentials: ChannelCredentials
private credentials: ChannelCredentials,
private connector: SubchannelConnector
) {
// Build user-agent string.
this.userAgent = [
options['grpc.primary_user_agent'],
`grpc-node-js/${clientVersion}`,
options['grpc.secondary_user_agent'],
]
.filter((e) => e)
.join(' '); // remove falsey values first
if ('grpc.keepalive_time_ms' in options) {
this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!;
}
if ('grpc.keepalive_timeout_ms' in options) {
this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!;
}
if ('grpc.keepalive_permit_without_calls' in options) {
this.keepaliveWithoutCalls =
options['grpc.keepalive_permit_without_calls'] === 1;
} else {
this.keepaliveWithoutCalls = false;
}
this.keepaliveIntervalId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveIntervalId);
this.keepaliveTimeoutId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveTimeoutId);
const backoffOptions: BackoffOptions = {
initialDelay: options['grpc.initial_reconnect_backoff_ms'],
maxDelay: options['grpc.max_reconnect_backoff_ms'],
@ -212,6 +111,8 @@ export class Subchannel {
}, backoffOptions);
this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
if (options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
@ -233,67 +134,6 @@ export class Subchannel {
};
}
private getChannelzSocketInfo(): SocketInfo | null {
if (this.session === null) {
return null;
}
const sessionSocket = this.session.socket;
const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null;
let tlsInfo: TlsInfo | null;
if (this.session.encrypted) {
const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher();
const certificate = tlsSocket.getCertificate();
const peerCertificate = tlsSocket.getPeerCertificate();
tlsInfo = {
cipherSuiteStandardName: cipherInfo.standardName ?? null,
cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
};
} else {
tlsInfo = null;
}
const socketInfo: SocketInfo = {
remoteAddress: remoteAddress,
localAddress: localAddress,
security: tlsInfo,
remoteName: this.remoteName,
streamsStarted: this.streamTracker.callsStarted,
streamsSucceeded: this.streamTracker.callsSucceeded,
streamsFailed: this.streamTracker.callsFailed,
messagesSent: this.messagesSent,
messagesReceived: this.messagesReceived,
keepAlivesSent: this.keepalivesSent,
lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp,
lastRemoteStreamCreatedTimestamp: null,
lastMessageSentTimestamp: this.lastMessageSentTimestamp,
lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp,
localFlowControlWindow: this.session.state.localWindowSize ?? null,
remoteFlowControlWindow: this.session.state.remoteWindowSize ?? null
};
return socketInfo;
}
private resetChannelzSocketInfo() {
if (!this.channelzEnabled) {
return;
}
if (this.channelzSocketRef) {
unregisterChannelzRef(this.channelzSocketRef);
this.childrenTracker.unrefChild(this.channelzSocketRef);
this.channelzSocketRef = null;
}
this.remoteName = null;
this.streamTracker = new ChannelzCallTracker();
this.keepalivesSent = 0;
this.messagesSent = 0;
this.messagesReceived = 0;
this.lastMessageSentTimestamp = null;
this.lastMessageReceivedTimestamp = null;
}
private trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
@ -302,18 +142,6 @@ export class Subchannel {
logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private flowControlTrace(text: string): void {
logging.trace(LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private internalsTrace(text: string): void {
logging.trace(LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private keepaliveTrace(text: string): void {
logging.trace(LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private handleBackoffTimer() {
if (this.continueConnecting) {
this.transitionToState(
@ -340,313 +168,39 @@ export class Subchannel {
this.backoffTimeout.reset();
}
private sendPing() {
if (this.channelzEnabled) {
this.keepalivesSent += 1;
}
this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms');
this.keepaliveTimeoutId = setTimeout(() => {
this.keepaliveTrace('Ping timeout passed without response');
this.handleDisconnect();
}, this.keepaliveTimeoutMs);
this.keepaliveTimeoutId.unref?.();
try {
this.session!.ping(
(err: Error | null, duration: number, payload: Buffer) => {
this.keepaliveTrace('Received ping response');
clearTimeout(this.keepaliveTimeoutId);
}
);
} catch (e) {
/* If we fail to send a ping, the connection is no longer functional, so
* we should discard it. */
this.transitionToState(
[ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE
);
}
}
private startKeepalivePings() {
this.keepaliveIntervalId = setInterval(() => {
this.sendPing();
}, this.keepaliveTimeMs);
this.keepaliveIntervalId.unref?.();
/* Don't send a ping immediately because whatever caused us to start
* sending pings should also involve some network activity. */
}
/**
* Stop keepalive pings when terminating a connection. This discards the
* outstanding ping timeout, so it should not be called if the same
* connection will still be used.
*/
private stopKeepalivePings() {
clearInterval(this.keepaliveIntervalId);
clearTimeout(this.keepaliveTimeoutId);
}
private createSession(proxyConnectionResult: ProxyConnectionResult) {
if (proxyConnectionResult.realTarget) {
this.remoteName = uriToString(proxyConnectionResult.realTarget);
this.trace('creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget);
} else {
this.remoteName = null;
this.trace('creating HTTP/2 session');
}
const targetAuthority = getDefaultAuthority(
proxyConnectionResult.realTarget ?? this.channelTarget
);
let connectionOptions: http2.SecureClientSessionOptions =
this.credentials._getConnectionOptions() || {};
connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER;
if ('grpc-node.max_session_memory' in this.options) {
connectionOptions.maxSessionMemory = this.options[
'grpc-node.max_session_memory'
];
} else {
/* By default, set a very large max session memory limit, to effectively
* disable enforcement of the limit. Some testing indicates that Node's
* behavior degrades badly when this limit is reached, so we solve that
* by disabling the check entirely. */
connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
}
let addressScheme = 'http://';
if ('secureContext' in connectionOptions) {
addressScheme = 'https://';
// If provided, the value of grpc.ssl_target_name_override should be used
// to override the target hostname when checking server identity.
// This option is used for testing only.
if (this.options['grpc.ssl_target_name_override']) {
const sslTargetNameOverride = this.options[
'grpc.ssl_target_name_override'
]!;
connectionOptions.checkServerIdentity = (
host: string,
cert: PeerCertificate
): Error | undefined => {
return checkServerIdentity(sslTargetNameOverride, cert);
};
connectionOptions.servername = sslTargetNameOverride;
} else {
const authorityHostname =
splitHostPort(targetAuthority)?.host ?? 'localhost';
// We want to always set servername to support SNI
connectionOptions.servername = authorityHostname;
}
if (proxyConnectionResult.socket) {
/* This is part of the workaround for
* https://github.com/nodejs/node/issues/32922. Without that bug,
* proxyConnectionResult.socket would always be a plaintext socket and
* this would say
* connectionOptions.socket = proxyConnectionResult.socket; */
connectionOptions.createConnection = (authority, option) => {
return proxyConnectionResult.socket!;
};
}
} else {
/* In all but the most recent versions of Node, http2.connect does not use
* the options when establishing plaintext connections, so we need to
* establish that connection explicitly. */
connectionOptions.createConnection = (authority, option) => {
if (proxyConnectionResult.socket) {
return proxyConnectionResult.socket;
} else {
/* net.NetConnectOpts is declared in a way that is more restrictive
* than what net.connect will actually accept, so we use the type
* assertion to work around that. */
return net.connect(this.subchannelAddress);
}
};
}
connectionOptions = {
...connectionOptions,
...this.subchannelAddress,
};
/* http2.connect uses the options here:
* https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036
* The spread operator overides earlier values with later ones, so any port
* or host values in the options will be used rather than any values extracted
* from the first argument. In addition, the path overrides the host and port,
* as documented for plaintext connections here:
* https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener
* and for TLS connections here:
* https://nodejs.org/api/tls.html#tls_tls_connect_options_callback. In
* earlier versions of Node, http2.connect passes these options to
* tls.connect but not net.connect, so in the insecure case we still need
* to set the createConnection option above to create the connection
* explicitly. We cannot do that in the TLS case because http2.connect
* passes necessary additional options to tls.connect.
* The first argument just needs to be parseable as a URL and the scheme
* determines whether the connection will be established over TLS or not.
*/
const session = http2.connect(
addressScheme + targetAuthority,
connectionOptions
);
this.session = session;
this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!, this.channelzEnabled);
if (this.channelzEnabled) {
this.childrenTracker.refChild(this.channelzSocketRef);
}
session.unref();
/* For all of these events, check if the session at the time of the event
* is the same one currently attached to this subchannel, to ensure that
* old events from previous connection attempts cannot cause invalid state
* transitions. */
session.once('connect', () => {
if (this.session === session) {
this.transitionToState(
[ConnectivityState.CONNECTING],
ConnectivityState.READY
);
}
});
session.once('close', () => {
if (this.session === session) {
this.trace('connection closed');
this.transitionToState(
[ConnectivityState.CONNECTING],
ConnectivityState.TRANSIENT_FAILURE
);
/* Transitioning directly to IDLE here should be OK because we are not
* doing any backoff, because a connection was established at some
* point */
this.transitionToState(
[ConnectivityState.READY],
ConnectivityState.IDLE
);
}
});
session.once(
'goaway',
(errorCode: number, lastStreamID: number, opaqueData: Buffer) => {
if (this.session === session) {
/* See the last paragraph of
* https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */
if (
errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM &&
opaqueData.equals(tooManyPingsData)
) {
this.keepaliveTimeMs = Math.min(
2 * this.keepaliveTimeMs,
KEEPALIVE_MAX_TIME_MS
);
logging.log(
LogVerbosity.ERROR,
`Connection to ${uriToString(this.channelTarget)} at ${
this.subchannelAddressString
} rejected by server because of excess pings. Increasing ping interval to ${
this.keepaliveTimeMs
} ms`
);
}
this.trace(
'connection closed by GOAWAY with code ' +
errorCode
);
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.IDLE
);
}
}
);
session.once('error', (error) => {
/* Do nothing here. Any error should also trigger a close event, which is
* where we want to handle that. */
this.trace(
'connection closed with error ' +
(error as Error).message
);
});
if (logging.isTracerEnabled(TRACER_NAME)) {
session.on('remoteSettings', (settings: http2.Settings) => {
this.trace(
'new settings received' +
(this.session !== session ? ' on the old connection' : '') +
': ' +
JSON.stringify(settings)
);
});
session.on('localSettings', (settings: http2.Settings) => {
this.trace(
'local settings acknowledged by remote' +
(this.session !== session ? ' on the old connection' : '') +
': ' +
JSON.stringify(settings)
);
});
}
}
private startConnectingInternal() {
/* Pass connection options through to the proxy so that it's able to
* upgrade it's connection to support tls if needed.
* This is a workaround for https://github.com/nodejs/node/issues/32922
* See https://github.com/grpc/grpc-node/pull/1369 for more info. */
const connectionOptions: ConnectionOptions =
this.credentials._getConnectionOptions() || {};
if ('secureContext' in connectionOptions) {
connectionOptions.ALPNProtocols = ['h2'];
// If provided, the value of grpc.ssl_target_name_override should be used
// to override the target hostname when checking server identity.
// This option is used for testing only.
if (this.options['grpc.ssl_target_name_override']) {
const sslTargetNameOverride = this.options[
'grpc.ssl_target_name_override'
]!;
connectionOptions.checkServerIdentity = (
host: string,
cert: PeerCertificate
): Error | undefined => {
return checkServerIdentity(sslTargetNameOverride, cert);
};
connectionOptions.servername = sslTargetNameOverride;
} else {
if ('grpc.http_connect_target' in this.options) {
/* This is more or less how servername will be set in createSession
* if a connection is successfully established through the proxy.
* If the proxy is not used, these connectionOptions are discarded
* anyway */
const targetPath = getDefaultAuthority(
parseUri(this.options['grpc.http_connect_target'] as string) ?? {
path: 'localhost',
let options = this.options;
if (options['grpc.keepalive_time_ms']) {
const adjustedKeepaliveTime = Math.min(this.keepaliveTime, KEEPALIVE_MAX_TIME_MS);
options = {...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime};
}
this.connector.connect(this.subchannelAddress, this.credentials, options).then(
transport => {
if (this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.READY)) {
this.transport = transport;
if (this.channelzEnabled) {
this.childrenTracker.refChild(transport.getChannelzRef());
}
transport.addDisconnectListener((tooManyPings) => {
this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE);
if (tooManyPings && this.keepaliveTime > 0) {
this.keepaliveTime *= 2;
logging.log(
LogVerbosity.ERROR,
`Connection to ${uriToString(this.channelTarget)} at ${
this.subchannelAddressString
} rejected by server because of excess pings. Increasing ping interval to ${
this.keepaliveTime
} ms`
);
}
);
const hostPort = splitHostPort(targetPath);
connectionOptions.servername = hostPort?.host ?? targetPath;
});
}
}
}
getProxiedConnection(
this.subchannelAddress,
this.options,
connectionOptions
).then(
(result) => {
this.createSession(result);
},
(reason) => {
this.transitionToState(
[ConnectivityState.CONNECTING],
ConnectivityState.TRANSIENT_FAILURE
);
error => {
this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.TRANSIENT_FAILURE);
}
);
}
private handleDisconnect() {
this.transitionToState(
[ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE);
for (const listener of this.disconnectListeners.values()) {
listener();
}
)
}
/**
@ -676,15 +230,6 @@ export class Subchannel {
switch (newState) {
case ConnectivityState.READY:
this.stopBackoff();
const session = this.session!;
session.socket.once('close', () => {
if (this.session === session) {
this.handleDisconnect();
}
});
if (this.keepaliveWithoutCalls) {
this.startKeepalivePings();
}
break;
case ConnectivityState.CONNECTING:
this.startBackoff();
@ -692,12 +237,11 @@ export class Subchannel {
this.continueConnecting = false;
break;
case ConnectivityState.TRANSIENT_FAILURE:
if (this.session) {
this.session.close();
if (this.channelzEnabled && this.transport) {
this.childrenTracker.unrefChild(this.transport.getChannelzRef());
}
this.session = null;
this.resetChannelzSocketInfo();
this.stopKeepalivePings();
this.transport?.shutdown();
this.transport = null;
/* If the backoff timer has already ended by the time we get to the
* TRANSIENT_FAILURE state, we want to immediately transition out of
* TRANSIENT_FAILURE as though the backoff timer is ending right now */
@ -708,12 +252,11 @@ export class Subchannel {
}
break;
case ConnectivityState.IDLE:
if (this.session) {
this.session.close();
if (this.channelzEnabled && this.transport) {
this.childrenTracker.unrefChild(this.transport.getChannelzRef());
}
this.session = null;
this.resetChannelzSocketInfo();
this.stopKeepalivePings();
this.transport?.shutdown();
this.transport = null;
break;
default:
throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
@ -721,71 +264,11 @@ export class Subchannel {
/* We use a shallow copy of the stateListeners array in case a listener
* is removed during this iteration */
for (const listener of [...this.stateListeners]) {
listener(this, previousState, newState);
listener(this, previousState, newState, this.keepaliveTime);
}
return true;
}
/**
* Check if the subchannel associated with zero calls and with zero channels.
* If so, shut it down.
*/
private checkBothRefcounts() {
/* If no calls, channels, or subchannel pools have any more references to
* this subchannel, we can be sure it will never be used again. */
if (this.callRefcount === 0 && this.refcount === 0) {
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
}
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.IDLE
);
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
}
}
callRef() {
this.refTrace(
'callRefcount ' +
this.callRefcount +
' -> ' +
(this.callRefcount + 1)
);
if (this.callRefcount === 0) {
if (this.session) {
this.session.ref();
}
this.backoffTimeout.ref();
if (!this.keepaliveWithoutCalls) {
this.startKeepalivePings();
}
}
this.callRefcount += 1;
}
callUnref() {
this.refTrace(
'callRefcount ' +
this.callRefcount +
' -> ' +
(this.callRefcount - 1)
);
this.callRefcount -= 1;
if (this.callRefcount === 0) {
if (this.session) {
this.session.unref();
}
this.backoffTimeout.unref();
if (!this.keepaliveWithoutCalls) {
clearInterval(this.keepaliveIntervalId);
}
this.checkBothRefcounts();
}
}
ref() {
this.refTrace(
'refcount ' +
@ -804,7 +287,18 @@ export class Subchannel {
(this.refcount - 1)
);
this.refcount -= 1;
this.checkBothRefcounts();
if (this.refcount === 0) {
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
}
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.IDLE
);
if (this.channelzEnabled) {
unregisterChannelzRef(this.channelzRef);
}
}
}
unrefIfOneRef(): boolean {
@ -816,83 +310,26 @@ export class Subchannel {
}
createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener): SubchannelCall {
const headers = metadata.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = host;
headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
headers[HTTP2_HEADER_METHOD] = 'POST';
headers[HTTP2_HEADER_PATH] = method;
headers[HTTP2_HEADER_TE] = 'trailers';
let http2Stream: http2.ClientHttp2Stream;
/* In theory, if an error is thrown by session.request because session has
* become unusable (e.g. because it has received a goaway), this subchannel
* should soon see the corresponding close or goaway event anyway and leave
* READY. But we have seen reports that this does not happen
* (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096)
* so for defense in depth, we just discard the session when we see an
* error here.
*/
try {
http2Stream = this.session!.request(headers);
} catch (e) {
this.transitionToState(
[ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE
);
throw e;
if (!this.transport) {
throw new Error('Cannot create call, subchannel not READY');
}
this.flowControlTrace(
'local window size: ' +
this.session!.state.localWindowSize +
' remote window size: ' +
this.session!.state.remoteWindowSize
);
const streamSession = this.session;
this.internalsTrace(
'session.closed=' +
streamSession!.closed +
' session.destroyed=' +
streamSession!.destroyed +
' session.socket.destroyed=' +
streamSession!.socket.destroyed);
let statsTracker: SubchannelCallStatsTracker;
let statsTracker: Partial<CallEventTracker>;
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
this.streamTracker.addCallStarted();
statsTracker = {
addMessageSent: () => {
this.messagesSent += 1;
this.lastMessageSentTimestamp = new Date();
},
addMessageReceived: () => {
this.messagesReceived += 1;
},
onCallEnd: status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
},
onStreamEnd: success => {
if (streamSession === this.session) {
if (success) {
this.streamTracker.addCallSucceeded();
} else {
this.streamTracker.addCallFailed();
}
}
}
}
} else {
statsTracker = {
addMessageSent: () => {},
addMessageReceived: () => {},
onCallEnd: () => {},
onStreamEnd: () => {}
}
statsTracker = {};
}
return new Http2SubchannelCall(http2Stream, statsTracker, listener, this, getNextCallNumber());
return this.transport.createCall(metadata, host, method, listener, statsTracker);
}
/**
@ -946,14 +383,6 @@ export class Subchannel {
}
}
addDisconnectListener(listener: () => void) {
this.disconnectListeners.add(listener);
}
removeDisconnectListener(listener: () => void) {
this.disconnectListeners.delete(listener);
}
/**
* Reset the backoff timeout, and immediately start connecting if in backoff.
*/
@ -976,4 +405,10 @@ export class Subchannel {
getRealSubchannel(): this {
return this;
}
throttleKeepalive(newKeepaliveTime: number) {
if (newKeepaliveTime > this.keepaliveTime) {
this.keepaliveTime = newKeepaliveTime;
}
}
}

View File

@ -0,0 +1,669 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import * as http2 from 'http2';
import { checkServerIdentity, CipherNameAndProtocol, ConnectionOptions, PeerCertificate, TLSSocket } from 'tls';
import { StatusObject } from './call-interface';
import { ChannelCredentials } from './channel-credentials';
import { ChannelOptions } from './channel-options';
import { ChannelzCallTracker, registerChannelzSocket, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef } from './channelz';
import { LogVerbosity } from './constants';
import { getProxiedConnection, ProxyConnectionResult } from './http_proxy';
import * as logging from './logging';
import { getDefaultAuthority } from './resolver';
import { stringToSubchannelAddress, SubchannelAddress, subchannelAddressToString } from './subchannel-address';
import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser';
import * as net from 'net';
import { Http2SubchannelCall, SubchannelCall, SubchannelCallInterceptingListener } from './subchannel-call';
import { Metadata } from './metadata';
import { getNextCallNumber } from './call-number';
const TRACER_NAME = 'transport';
const FLOW_CONTROL_TRACER_NAME = 'transport_flowctrl';
const clientVersion = require('../../package.json').version;
const {
HTTP2_HEADER_AUTHORITY,
HTTP2_HEADER_CONTENT_TYPE,
HTTP2_HEADER_METHOD,
HTTP2_HEADER_PATH,
HTTP2_HEADER_TE,
HTTP2_HEADER_USER_AGENT,
} = http2.constants;
/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
* have a constant for the max signed 32 bit integer, so this is a simple way
* to calculate it */
const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
const KEEPALIVE_TIMEOUT_MS = 20000;
export interface CallEventTracker {
addMessageSent(): void;
addMessageReceived(): void;
onCallEnd(status: StatusObject): void;
onStreamEnd(success: boolean): void;
}
export interface TransportDisconnectListener {
(tooManyPings: boolean): void;
}
export interface Transport {
getChannelzRef(): SocketRef;
getPeerName(): string;
createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener, subchannelCallStatsTracker: Partial<CallEventTracker>): SubchannelCall;
addDisconnectListener(listener: TransportDisconnectListener): void;
shutdown(): void;
}
const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii');
class Http2Transport implements Transport {
/**
* The amount of time in between sending pings
*/
private keepaliveTimeMs: number = -1;
/**
* The amount of time to wait for an acknowledgement after sending a ping
*/
private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS;
/**
* Timer reference for timeout that indicates when to send the next ping
*/
private keepaliveIntervalId: NodeJS.Timer;
/**
* Timer reference tracking when the most recent ping will be considered lost
*/
private keepaliveTimeoutId: NodeJS.Timer | null = null;
/**
* Indicates whether keepalive pings should be sent without any active calls
*/
private keepaliveWithoutCalls = false;
private userAgent: string;
private activeCalls: Set<Http2SubchannelCall> = new Set();
private subchannelAddressString: string;
private disconnectListeners: TransportDisconnectListener[] = [];
private disconnectHandled = false;
// Channelz info
private channelzRef: SocketRef;
private readonly channelzEnabled: boolean = true;
/**
* Name of the remote server, if it is not the same as the subchannel
* address, i.e. if connecting through an HTTP CONNECT proxy.
*/
private remoteName: string | null = null;
private streamTracker = new ChannelzCallTracker();
private keepalivesSent = 0;
private messagesSent = 0;
private messagesReceived = 0;
private lastMessageSentTimestamp: Date | null = null;
private lastMessageReceivedTimestamp: Date | null = null;
constructor(
private session: http2.ClientHttp2Session,
subchannelAddress: SubchannelAddress,
options: ChannelOptions
) {
// Build user-agent string.
this.userAgent = [
options['grpc.primary_user_agent'],
`grpc-node-js/${clientVersion}`,
options['grpc.secondary_user_agent'],
]
.filter((e) => e)
.join(' '); // remove falsey values first
if ('grpc.keepalive_time_ms' in options) {
this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!;
}
if ('grpc.keepalive_timeout_ms' in options) {
this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!;
}
if ('grpc.keepalive_permit_without_calls' in options) {
this.keepaliveWithoutCalls =
options['grpc.keepalive_permit_without_calls'] === 1;
} else {
this.keepaliveWithoutCalls = false;
}
this.keepaliveIntervalId = setTimeout(() => {}, 0);
clearTimeout(this.keepaliveIntervalId);
if (this.keepaliveWithoutCalls) {
this.startKeepalivePings();
}
this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
if (options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}
this.channelzRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled);
session.once('close', () => {
this.trace('session closed');
this.stopKeepalivePings();
this.handleDisconnect();
});
session.once('goaway', (errorCode: number, lastStreamID: number, opaqueData: Buffer) => {
let tooManyPings = false;
/* See the last paragraph of
* https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */
if (
errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM &&
opaqueData.equals(tooManyPingsData)
) {
tooManyPings = true;
}
this.trace(
'connection closed by GOAWAY with code ' +
errorCode
);
this.reportDisconnectToOwner(tooManyPings);
});
session.once('error', error => {
/* Do nothing here. Any error should also trigger a close event, which is
* where we want to handle that. */
this.trace(
'connection closed with error ' +
(error as Error).message
);
});
if (logging.isTracerEnabled(TRACER_NAME)) {
session.on('remoteSettings', (settings: http2.Settings) => {
this.trace(
'new settings received' +
(this.session !== session ? ' on the old connection' : '') +
': ' +
JSON.stringify(settings)
);
});
session.on('localSettings', (settings: http2.Settings) => {
this.trace(
'local settings acknowledged by remote' +
(this.session !== session ? ' on the old connection' : '') +
': ' +
JSON.stringify(settings)
);
});
}
}
private getChannelzInfo(): SocketInfo {
const sessionSocket = this.session.socket;
const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null;
let tlsInfo: TlsInfo | null;
if (this.session.encrypted) {
const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher();
const certificate = tlsSocket.getCertificate();
const peerCertificate = tlsSocket.getPeerCertificate();
tlsInfo = {
cipherSuiteStandardName: cipherInfo.standardName ?? null,
cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
};
} else {
tlsInfo = null;
}
const socketInfo: SocketInfo = {
remoteAddress: remoteAddress,
localAddress: localAddress,
security: tlsInfo,
remoteName: this.remoteName,
streamsStarted: this.streamTracker.callsStarted,
streamsSucceeded: this.streamTracker.callsSucceeded,
streamsFailed: this.streamTracker.callsFailed,
messagesSent: this.messagesSent,
messagesReceived: this.messagesReceived,
keepAlivesSent: this.keepalivesSent,
lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp,
lastRemoteStreamCreatedTimestamp: null,
lastMessageSentTimestamp: this.lastMessageSentTimestamp,
lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp,
localFlowControlWindow: this.session.state.localWindowSize ?? null,
remoteFlowControlWindow: this.session.state.remoteWindowSize ?? null
};
return socketInfo;
}
private trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private keepaliveTrace(text: string): void {
logging.trace(LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private flowControlTrace(text: string): void {
logging.trace(LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
private internalsTrace(text: string): void {
logging.trace(LogVerbosity.DEBUG, 'transport_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
}
/**
* Indicate to the owner of this object that this transport should no longer
* be used. That happens if the connection drops, or if the server sends a
* GOAWAY.
* @param tooManyPings If true, this was triggered by a GOAWAY with data
* indicating that the session was closed becaues the client sent too many
* pings.
* @returns
*/
private reportDisconnectToOwner(tooManyPings: boolean) {
if (this.disconnectHandled) {
return;
}
this.disconnectHandled = true;
this.disconnectListeners.forEach(listener => listener(tooManyPings));
}
/**
* Handle connection drops, but not GOAWAYs.
*/
private handleDisconnect() {
this.reportDisconnectToOwner(false);
/* Give calls an event loop cycle to finish naturally before reporting the
* disconnnection to them. */
setImmediate(() => {
for (const call of this.activeCalls) {
call.onDisconnect();
}
});
}
addDisconnectListener(listener: TransportDisconnectListener): void {
this.disconnectListeners.push(listener);
}
private clearKeepaliveTimeout() {
if (!this.keepaliveTimeoutId) {
return;
}
clearTimeout(this.keepaliveTimeoutId);
this.keepaliveTimeoutId = null;
}
private sendPing() {
if (this.channelzEnabled) {
this.keepalivesSent += 1;
}
this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms');
if (!this.keepaliveTimeoutId) {
this.keepaliveTimeoutId = setTimeout(() => {
this.keepaliveTrace('Ping timeout passed without response');
this.handleDisconnect();
}, this.keepaliveTimeoutMs);
this.keepaliveTimeoutId.unref?.();
}
try {
this.session!.ping(
(err: Error | null, duration: number, payload: Buffer) => {
this.keepaliveTrace('Received ping response');
this.clearKeepaliveTimeout();
}
);
} catch (e) {
/* If we fail to send a ping, the connection is no longer functional, so
* we should discard it. */
this.handleDisconnect();
}
}
private startKeepalivePings() {
if (this.keepaliveTimeMs < 0) {
return;
}
this.keepaliveIntervalId = setInterval(() => {
this.sendPing();
}, this.keepaliveTimeMs);
this.keepaliveIntervalId.unref?.();
/* Don't send a ping immediately because whatever caused us to start
* sending pings should also involve some network activity. */
}
/**
* Stop keepalive pings when terminating a connection. This discards the
* outstanding ping timeout, so it should not be called if the same
* connection will still be used.
*/
private stopKeepalivePings() {
clearInterval(this.keepaliveIntervalId);
this.clearKeepaliveTimeout();
}
private removeActiveCall(call: Http2SubchannelCall) {
this.activeCalls.delete(call);
if (this.activeCalls.size === 0) {
this.session.unref();
if (!this.keepaliveWithoutCalls) {
this.stopKeepalivePings();
}
}
}
private addActiveCall(call: Http2SubchannelCall) {
if (this.activeCalls.size === 0) {
this.session.ref();
if (!this.keepaliveWithoutCalls) {
this.startKeepalivePings();
}
}
this.activeCalls.add(call);
}
createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener, subchannelCallStatsTracker: Partial<CallEventTracker>): Http2SubchannelCall {
const headers = metadata.toHttp2Headers();
headers[HTTP2_HEADER_AUTHORITY] = host;
headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
headers[HTTP2_HEADER_METHOD] = 'POST';
headers[HTTP2_HEADER_PATH] = method;
headers[HTTP2_HEADER_TE] = 'trailers';
let http2Stream: http2.ClientHttp2Stream;
/* In theory, if an error is thrown by session.request because session has
* become unusable (e.g. because it has received a goaway), this subchannel
* should soon see the corresponding close or goaway event anyway and leave
* READY. But we have seen reports that this does not happen
* (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096)
* so for defense in depth, we just discard the session when we see an
* error here.
*/
try {
http2Stream = this.session!.request(headers);
} catch (e) {
this.handleDisconnect();
throw e;
}
this.flowControlTrace(
'local window size: ' +
this.session.state.localWindowSize +
' remote window size: ' +
this.session.state.remoteWindowSize
);
this.internalsTrace(
'session.closed=' +
this.session.closed +
' session.destroyed=' +
this.session.destroyed +
' session.socket.destroyed=' +
this.session.socket.destroyed);
let eventTracker: CallEventTracker;
let call: Http2SubchannelCall;
if (this.channelzEnabled) {
this.streamTracker.addCallStarted();
eventTracker = {
addMessageSent: () => {
this.messagesSent += 1;
this.lastMessageSentTimestamp = new Date();
subchannelCallStatsTracker.addMessageSent?.();
},
addMessageReceived: () => {
this.messagesReceived += 1;
this.lastMessageReceivedTimestamp = new Date();
subchannelCallStatsTracker.addMessageReceived?.();
},
onCallEnd: status => {
subchannelCallStatsTracker.onCallEnd?.(status);
this.removeActiveCall(call);
},
onStreamEnd: success => {
if (success) {
this.streamTracker.addCallSucceeded();
} else {
this.streamTracker.addCallFailed();
}
subchannelCallStatsTracker.onStreamEnd?.(success);
}
}
} else {
eventTracker = {
addMessageSent: () => {
subchannelCallStatsTracker.addMessageSent?.();
},
addMessageReceived: () => {
subchannelCallStatsTracker.addMessageReceived?.();
},
onCallEnd: (status) => {
subchannelCallStatsTracker.onCallEnd?.(status);
this.removeActiveCall(call);
},
onStreamEnd: (success) => {
subchannelCallStatsTracker.onStreamEnd?.(success);
}
}
}
call = new Http2SubchannelCall(http2Stream, eventTracker, listener, this, getNextCallNumber());
this.addActiveCall(call);
return call;
}
getChannelzRef(): SocketRef {
return this.channelzRef;
}
getPeerName() {
return this.subchannelAddressString;
}
shutdown() {
this.session.close();
unregisterChannelzRef(this.channelzRef);
}
}
export interface SubchannelConnector {
connect(address: SubchannelAddress, credentials: ChannelCredentials, options: ChannelOptions): Promise<Transport>;
shutdown(): void;
}
export class Http2SubchannelConnector implements SubchannelConnector {
private session: http2.ClientHttp2Session | null = null;
private isShutdown = false;
constructor(private channelTarget: GrpcUri) {}
private trace(text: string) {
}
private createSession(address: SubchannelAddress, credentials: ChannelCredentials, options: ChannelOptions, proxyConnectionResult: ProxyConnectionResult): Promise<Http2Transport> {
if (this.isShutdown) {
return Promise.reject();
}
return new Promise<Http2Transport>((resolve, reject) => {
let remoteName: string | null;
if (proxyConnectionResult.realTarget) {
remoteName = uriToString(proxyConnectionResult.realTarget);
this.trace('creating HTTP/2 session through proxy to ' + uriToString(proxyConnectionResult.realTarget));
} else {
remoteName = null;
this.trace('creating HTTP/2 session to ' + subchannelAddressToString(address));
}
const targetAuthority = getDefaultAuthority(
proxyConnectionResult.realTarget ?? this.channelTarget
);
let connectionOptions: http2.SecureClientSessionOptions =
credentials._getConnectionOptions() || {};
connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER;
if ('grpc-node.max_session_memory' in options) {
connectionOptions.maxSessionMemory = options[
'grpc-node.max_session_memory'
];
} else {
/* By default, set a very large max session memory limit, to effectively
* disable enforcement of the limit. Some testing indicates that Node's
* behavior degrades badly when this limit is reached, so we solve that
* by disabling the check entirely. */
connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
}
let addressScheme = 'http://';
if ('secureContext' in connectionOptions) {
addressScheme = 'https://';
// If provided, the value of grpc.ssl_target_name_override should be used
// to override the target hostname when checking server identity.
// This option is used for testing only.
if (options['grpc.ssl_target_name_override']) {
const sslTargetNameOverride = options[
'grpc.ssl_target_name_override'
]!;
connectionOptions.checkServerIdentity = (
host: string,
cert: PeerCertificate
): Error | undefined => {
return checkServerIdentity(sslTargetNameOverride, cert);
};
connectionOptions.servername = sslTargetNameOverride;
} else {
const authorityHostname =
splitHostPort(targetAuthority)?.host ?? 'localhost';
// We want to always set servername to support SNI
connectionOptions.servername = authorityHostname;
}
if (proxyConnectionResult.socket) {
/* This is part of the workaround for
* https://github.com/nodejs/node/issues/32922. Without that bug,
* proxyConnectionResult.socket would always be a plaintext socket and
* this would say
* connectionOptions.socket = proxyConnectionResult.socket; */
connectionOptions.createConnection = (authority, option) => {
return proxyConnectionResult.socket!;
};
}
} else {
/* In all but the most recent versions of Node, http2.connect does not use
* the options when establishing plaintext connections, so we need to
* establish that connection explicitly. */
connectionOptions.createConnection = (authority, option) => {
if (proxyConnectionResult.socket) {
return proxyConnectionResult.socket;
} else {
/* net.NetConnectOpts is declared in a way that is more restrictive
* than what net.connect will actually accept, so we use the type
* assertion to work around that. */
return net.connect(address);
}
};
}
connectionOptions = {
...connectionOptions,
...address,
};
/* http2.connect uses the options here:
* https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036
* The spread operator overides earlier values with later ones, so any port
* or host values in the options will be used rather than any values extracted
* from the first argument. In addition, the path overrides the host and port,
* as documented for plaintext connections here:
* https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener
* and for TLS connections here:
* https://nodejs.org/api/tls.html#tls_tls_connect_options_callback. In
* earlier versions of Node, http2.connect passes these options to
* tls.connect but not net.connect, so in the insecure case we still need
* to set the createConnection option above to create the connection
* explicitly. We cannot do that in the TLS case because http2.connect
* passes necessary additional options to tls.connect.
* The first argument just needs to be parseable as a URL and the scheme
* determines whether the connection will be established over TLS or not.
*/
const session = http2.connect(
addressScheme + targetAuthority,
connectionOptions
);
this.session = session;
session.unref();
session.once('connect', () => {
session.removeAllListeners();
resolve(new Http2Transport(session, address, options));
this.session = null;
});
session.once('close', () => {
this.session = null;
reject();
});
session.once('error', error => {
this.trace('connection failed with error ' + (error as Error).message)
});
});
}
connect(address: SubchannelAddress, credentials: ChannelCredentials, options: ChannelOptions): Promise<Http2Transport> {
if (this.isShutdown) {
return Promise.reject();
}
/* Pass connection options through to the proxy so that it's able to
* upgrade it's connection to support tls if needed.
* This is a workaround for https://github.com/nodejs/node/issues/32922
* See https://github.com/grpc/grpc-node/pull/1369 for more info. */
const connectionOptions: ConnectionOptions =
credentials._getConnectionOptions() || {};
if ('secureContext' in connectionOptions) {
connectionOptions.ALPNProtocols = ['h2'];
// If provided, the value of grpc.ssl_target_name_override should be used
// to override the target hostname when checking server identity.
// This option is used for testing only.
if (options['grpc.ssl_target_name_override']) {
const sslTargetNameOverride = options[
'grpc.ssl_target_name_override'
]!;
connectionOptions.checkServerIdentity = (
host: string,
cert: PeerCertificate
): Error | undefined => {
return checkServerIdentity(sslTargetNameOverride, cert);
};
connectionOptions.servername = sslTargetNameOverride;
} else {
if ('grpc.http_connect_target' in options) {
/* This is more or less how servername will be set in createSession
* if a connection is successfully established through the proxy.
* If the proxy is not used, these connectionOptions are discarded
* anyway */
const targetPath = getDefaultAuthority(
parseUri(options['grpc.http_connect_target'] as string) ?? {
path: 'localhost',
}
);
const hostPort = splitHostPort(targetPath);
connectionOptions.servername = hostPort?.host ?? targetPath;
}
}
}
return getProxiedConnection(
address,
options,
connectionOptions
).then(
result => this.createSession(address, credentials, options, result)
);
}
shutdown(): void {
this.isShutdown = true;
this.session?.close();
this.session = null;
}
}

View File

@ -207,6 +207,64 @@ describe('Name Resolver', () => {
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
});
// Created DNS TXT record using TXT sample from https://github.com/grpc/proposal/blob/master/A2-service-configs-in-dns.md
// "grpc_config=[{\"serviceConfig\":{\"loadBalancingPolicy\":\"round_robin\",\"methodConfig\":[{\"name\":[{\"service\":\"MyService\",\"method\":\"Foo\"}],\"waitForReady\":true}]}}]"
it.skip('Should resolve a name with TXT service config', done => {
const target = resolverManager.mapUriDefaultScheme(parseUri('grpctest.kleinsch.com')!)!;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
if (serviceConfig !== null) {
assert(
serviceConfig.loadBalancingPolicy === 'round_robin',
'Should have found round robin LB policy'
);
done();
}
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
};
const resolver = resolverManager.createResolver(target, listener, {});
resolver.updateResolution();
});
it.skip(
'Should not resolve TXT service config if we disabled service config',
(done) => {
const target = resolverManager.mapUriDefaultScheme(
parseUri('grpctest.kleinsch.com')!
)!;
let count = 0;
const listener: resolverManager.ResolverListener = {
onSuccessfulResolution: (
addressList: SubchannelAddress[],
serviceConfig: ServiceConfig | null,
serviceConfigError: StatusObject | null
) => {
assert(
serviceConfig === null,
'Should not have found service config'
);
count++;
},
onError: (error: StatusObject) => {
done(new Error(`Failed with status ${error.details}`));
},
};
const resolver = resolverManager.createResolver(target, listener, {
'grpc.service_config_disable_resolution': 1,
});
resolver.updateResolution();
setTimeout(() => {
assert(count === 1, 'Should have only resolved once');
done();
}, 2_000);
}
);
/* The DNS entry for loopback4.unittest.grpc.io only has a single A record
* with the address 127.0.0.1, but the Mac DNS resolver appears to use
* NAT64 to create an IPv6 address in that case, so it instead returns

View File

@ -27,9 +27,9 @@ import * as grpc from '../src';
import { Server, ServerCredentials } from '../src';
import { ServiceError } from '../src/call';
import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
import { sendUnaryData, ServerUnaryCall } from '../src/server-call';
import { sendUnaryData, ServerUnaryCall, ServerDuplexStream } from '../src/server-call';
import { loadProtoFile } from './common';
import { assert2, loadProtoFile } from './common';
import { TestServiceClient, TestServiceHandlers } from './generated/TestService';
import { ProtoGrpcType as TestServiceGrpcType } from './generated/test_service';
import { Request__Output } from './generated/Request';
@ -458,18 +458,28 @@ describe('Server', () => {
describe('Echo service', () => {
let server: Server;
let client: ServiceClient;
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;
const serviceImplementation = {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
echoBidiStream(call: ServerDuplexStream<any, any>) {
call.on('data', data => {
call.write(data);
});
call.on('end', () => {
call.end();
});
}
};
before(done => {
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;
server = new Server();
server.addService(echoService.service, {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
});
server.addService(echoService.service, serviceImplementation);
server.bindAsync(
'localhost:0',
@ -501,6 +511,43 @@ describe('Echo service', () => {
}
);
});
/* This test passes on Node 18 but fails on Node 16. The failure appears to
* be caused by https://github.com/nodejs/node/issues/42713 */
it.skip('should continue a stream after server shutdown', done => {
const server2 = new Server();
server2.addService(echoService.service, serviceImplementation);
server2.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => {
if (err) {
done(err);
return;
}
const client2 = new echoService(`localhost:${port}`, grpc.credentials.createInsecure());
server2.start();
const stream = client2.echoBidiStream();
const totalMessages = 5;
let messagesSent = 0;
stream.write({ value: 'test value', value2: messagesSent});
messagesSent += 1;
stream.on('data', () => {
if (messagesSent === 1) {
server2.tryShutdown(assert2.mustCall(() => {}));
}
if (messagesSent >= totalMessages) {
stream.end();
} else {
stream.write({ value: 'test value', value2: messagesSent});
messagesSent += 1;
}
});
stream.on('status', assert2.mustCall((status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(messagesSent, totalMessages);
}));
stream.on('error', () => {});
assert2.afterMustCallsSatisfied(done);
});
});
});
describe('Generic client and server', () => {