diff --git a/packages/grpc-js-xds/package.json b/packages/grpc-js-xds/package.json index bd1511e5..9511776c 100644 --- a/packages/grpc-js-xds/package.json +++ b/packages/grpc-js-xds/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js-xds", - "version": "1.8.0", + "version": "1.8.1", "description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.", "main": "build/src/index.js", "scripts": { diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index a2d33d1a..20c914a1 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -342,11 +342,13 @@ export class XdsClient { this.adsNode = { ...bootstrapInfo.node, user_agent_name: userAgentName, + user_agent_version: clientVersion, client_features: ['envoy.lb.does_not_support_overprovisioning'], }; this.lrsNode = { ...bootstrapInfo.node, user_agent_name: userAgentName, + user_agent_version: clientVersion, client_features: ['envoy.lrs.supports_send_all_clusters'], }; setCsdsClientNode(this.adsNode); diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts index 891eb7c8..fef69451 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -32,6 +32,8 @@ const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [ 'suffix_match']; const SUPPORTED_CLUSTER_SPECIFIERS = ['cluster', 'weighted_clusters', 'cluster_header']; +const UINT32_MAX = 0xFFFFFFFF; + function durationToMs(duration: Duration__Output | null): number | null { if (duration === null) { return null; @@ -130,14 +132,11 @@ export class RdsState extends BaseXdsStreamState imp } } if (route.route!.cluster_specifier === 'weighted_clusters') { - if (route.route.weighted_clusters!.total_weight?.value === 0) { - return false; - } let weightSum = 0; for (const clusterWeight of route.route.weighted_clusters!.clusters) { weightSum += clusterWeight.weight?.value ?? 0; } - if (weightSum !== route.route.weighted_clusters!.total_weight?.value ?? 100) { + if (weightSum === 0 || weightSum > UINT32_MAX) { return false; } if (EXPERIMENTAL_FAULT_INJECTION) { diff --git a/packages/grpc-js/README.md b/packages/grpc-js/README.md index 3d698dfd..652ce5fe 100644 --- a/packages/grpc-js/README.md +++ b/packages/grpc-js/README.md @@ -62,6 +62,7 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`. - `grpc.enable_retries` - `grpc.per_rpc_retry_buffer_size` - `grpc.retry_buffer_size` + - `grpc.service_config_disable_resolution` - `grpc-node.max_session_memory` - `channelOverride` - `channelFactoryOverride` diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 9aa68574..815dabeb 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.0", + "version": "1.8.13", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/channel-options.ts b/packages/grpc-js/src/channel-options.ts index 8830ed43..a41b89e9 100644 --- a/packages/grpc-js/src/channel-options.ts +++ b/packages/grpc-js/src/channel-options.ts @@ -55,6 +55,7 @@ export interface ChannelOptions { 'grpc.max_connection_age_ms'?: number; 'grpc.max_connection_age_grace_ms'?: number; 'grpc-node.max_session_memory'?: number; + 'grpc.service_config_disable_resolution'?: number; // eslint-disable-next-line @typescript-eslint/no-explicit-any [key: string]: any; } @@ -87,6 +88,7 @@ export const recognizedOptions = { 'grpc.max_connection_age_ms': true, 'grpc.max_connection_age_grace_ms': true, 'grpc-node.max_session_memory': true, + 'grpc.service_config_disable_resolution': true, }; export function channelOptionsEqual( diff --git a/packages/grpc-js/src/deadline.ts b/packages/grpc-js/src/deadline.ts index 58ea0a80..be1f3c3b 100644 --- a/packages/grpc-js/src/deadline.ts +++ b/packages/grpc-js/src/deadline.ts @@ -51,8 +51,45 @@ export function getDeadlineTimeoutString(deadline: Deadline) { throw new Error('Deadline is too far in the future') } +/** + * See https://nodejs.org/api/timers.html#settimeoutcallback-delay-args + * In particular, "When delay is larger than 2147483647 or less than 1, the + * delay will be set to 1. Non-integer delays are truncated to an integer." + * This number of milliseconds is almost 25 days. + */ +const MAX_TIMEOUT_TIME = 2147483647; + +/** + * Get the timeout value that should be passed to setTimeout now for the timer + * to end at the deadline. For any deadline before now, the timer should end + * immediately, represented by a value of 0. For any deadline more than + * MAX_TIMEOUT_TIME milliseconds in the future, a timer cannot be set that will + * end at that time, so it is treated as infinitely far in the future. + * @param deadline + * @returns + */ export function getRelativeTimeout(deadline: Deadline) { const deadlineMs = deadline instanceof Date ? deadline.getTime() : deadline; const now = new Date().getTime(); - return deadlineMs - now; + const timeout = deadlineMs - now; + if (timeout < 0) { + return 0; + } else if (timeout > MAX_TIMEOUT_TIME) { + return Infinity + } else { + return timeout; + } +} + +export function deadlineToString(deadline: Deadline): string { + if (deadline instanceof Date) { + return deadline.toISOString(); + } else { + const dateDeadline = new Date(deadline); + if (Number.isNaN(dateDeadline.getTime())) { + return '' + deadline; + } else { + return dateDeadline.toISOString(); + } + } } \ No newline at end of file diff --git a/packages/grpc-js/src/index.ts b/packages/grpc-js/src/index.ts index 786fa992..51f39478 100644 --- a/packages/grpc-js/src/index.ts +++ b/packages/grpc-js/src/index.ts @@ -237,7 +237,7 @@ export const getClientChannel = (client: Client) => { export { StatusBuilder }; -export { Listener } from './call-interface'; +export { Listener, InterceptingListener } from './call-interface'; export { Requester, @@ -248,6 +248,7 @@ export { InterceptorProvider, InterceptingCall, InterceptorConfigurationError, + NextCall } from './client-interceptors'; export { diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 9137be27..14038bd3 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -46,11 +46,12 @@ import { LoadBalancingCall } from './load-balancing-call'; import { CallCredentials } from './call-credentials'; import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from './call-interface'; import { SubchannelCall } from './subchannel-call'; -import { Deadline, getDeadlineTimeoutString } from './deadline'; +import { Deadline, deadlineToString, getDeadlineTimeoutString } from './deadline'; import { ResolvingCall } from './resolving-call'; import { getNextCallNumber } from './call-number'; import { restrictControlPlaneStatusCode } from './control-plane-status'; import { MessageBufferTracker, RetryingCall, RetryThrottler } from './retrying-call'; +import { BaseSubchannelWrapper, ConnectivityStateListener, SubchannelInterface } from './subchannel-interface'; /** * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args @@ -84,6 +85,32 @@ const RETRY_THROTTLER_MAP: Map = new Map(); const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1<<24; // 16 MB const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1<<20; // 1 MB +class ChannelSubchannelWrapper extends BaseSubchannelWrapper implements SubchannelInterface { + private refCount = 0; + private subchannelStateListener: ConnectivityStateListener; + constructor(childSubchannel: SubchannelInterface, private channel: InternalChannel) { + super(childSubchannel); + this.subchannelStateListener = (subchannel, previousState, newState, keepaliveTime) => { + channel.throttleKeepalive(keepaliveTime); + }; + childSubchannel.addConnectivityStateListener(this.subchannelStateListener); + } + + ref(): void { + this.child.ref(); + this.refCount += 1; + } + + unref(): void { + this.child.unref(); + this.refCount -= 1; + if (this.refCount <= 0) { + this.child.removeConnectivityStateListener(this.subchannelStateListener); + this.channel.removeWrappedSubchannel(this); + } + } +} + export class InternalChannel { private resolvingLoadBalancer: ResolvingLoadBalancer; @@ -116,8 +143,10 @@ export class InternalChannel { * configSelector becomes set or the channel state becomes anything other * than TRANSIENT_FAILURE. */ - private currentResolutionError: StatusObject | null = null; - private retryBufferTracker: MessageBufferTracker; + private currentResolutionError: StatusObject | null = null; + private retryBufferTracker: MessageBufferTracker; + private keepaliveTime: number; + private wrappedSubchannels: Set = new Set(); // Channelz info private readonly channelzEnabled: boolean = true; @@ -190,6 +219,7 @@ export class InternalChannel { options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES, options['grpc.per_rpc_retry_buffer_size'] ?? DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES ); + this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1; const channelControlHelper: ChannelControlHelper = { createSubchannel: ( subchannelAddress: SubchannelAddress, @@ -201,10 +231,13 @@ export class InternalChannel { Object.assign({}, this.options, subchannelArgs), this.credentials ); + subchannel.throttleKeepalive(this.keepaliveTime); if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef()); } - return subchannel; + const wrappedSubchannel = new ChannelSubchannelWrapper(subchannel, this); + this.wrappedSubchannels.add(wrappedSubchannel); + return wrappedSubchannel; }, updateState: (connectivityState: ConnectivityState, picker: Picker) => { this.currentPicker = picker; @@ -369,6 +402,19 @@ export class InternalChannel { } } + throttleKeepalive(newKeepaliveTime: number) { + if (newKeepaliveTime > this.keepaliveTime) { + this.keepaliveTime = newKeepaliveTime; + for (const wrappedSubchannel of this.wrappedSubchannels) { + wrappedSubchannel.throttleKeepalive(newKeepaliveTime); + } + } + } + + removeWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) { + this.wrappedSubchannels.delete(wrappedSubchannel); + } + doPick(metadata: Metadata, extraPickInfo: {[key: string]: string}) { return this.currentPicker.pick({metadata: metadata, extraPickInfo: extraPickInfo}); } @@ -469,7 +515,7 @@ export class InternalChannel { '] method="' + method + '", deadline=' + - deadline + deadlineToString(deadline) ); const finalOptions: CallStreamOptions = { deadline: deadline, diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index cdf58052..2f72a962 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -205,11 +205,11 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements constructor(childSubchannel: SubchannelInterface, private mapEntry?: MapEntry) { super(childSubchannel); this.childSubchannelState = childSubchannel.getConnectivityState(); - childSubchannel.addConnectivityStateListener((subchannel, previousState, newState) => { + childSubchannel.addConnectivityStateListener((subchannel, previousState, newState, keepaliveTime) => { this.childSubchannelState = newState; if (!this.ejected) { for (const listener of this.stateListeners) { - listener(this, previousState, newState); + listener(this, previousState, newState, keepaliveTime); } } }); @@ -265,14 +265,14 @@ class OutlierDetectionSubchannelWrapper extends BaseSubchannelWrapper implements eject() { this.ejected = true; for (const listener of this.stateListeners) { - listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE); + listener(this, this.childSubchannelState, ConnectivityState.TRANSIENT_FAILURE, -1); } } uneject() { this.ejected = false; for (const listener of this.stateListeners) { - listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState); + listener(this, ConnectivityState.TRANSIENT_FAILURE, this.childSubchannelState, -1); } } diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 027eae07..a501b1f7 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -33,6 +33,7 @@ import { } from './picker'; import { SubchannelAddress, + subchannelAddressEqual, subchannelAddressToString, } from './subchannel-address'; import * as logging from './logging'; @@ -168,7 +169,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { * connecting to the next one instead of waiting for the connection * delay timer. */ if ( - subchannel === this.subchannels[this.currentSubchannelIndex] && + subchannel.getRealSubchannel() === this.subchannels[this.currentSubchannelIndex].getRealSubchannel() && newState === ConnectivityState.TRANSIENT_FAILURE ) { this.startNextSubchannelConnecting(); @@ -419,8 +420,9 @@ export class PickFirstLoadBalancer implements LoadBalancer { * address list is different from the existing one */ if ( this.subchannels.length === 0 || + this.latestAddressList.length !== addressList.length || !this.latestAddressList.every( - (value, index) => addressList[index] === value + (value, index) => addressList[index] && subchannelAddressEqual(addressList[index], value) ) ) { this.latestAddressList = addressList; diff --git a/packages/grpc-js/src/load-balancing-call.ts b/packages/grpc-js/src/load-balancing-call.ts index 48aaf48a..f7493398 100644 --- a/packages/grpc-js/src/load-balancing-call.ts +++ b/packages/grpc-js/src/load-balancing-call.ts @@ -102,6 +102,7 @@ export class LoadBalancingCall implements Call { if (!this.metadata) { throw new Error('doPick called before start'); } + this.trace('Pick called') const pickResult = this.channel.doPick(this.metadata, this.callConfig.pickInformation); const subchannelString = pickResult.subchannel ? '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() : diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index 4b7cb2fe..355ce2df 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -98,6 +98,7 @@ class DnsResolver implements Resolver { private continueResolving = false; private nextResolutionTimer: NodeJS.Timer; private isNextResolutionTimerRunning = false; + private isServiceConfigEnabled = true; constructor( private target: GrpcUri, private listener: ResolverListener, @@ -127,6 +128,10 @@ class DnsResolver implements Resolver { } this.percentage = Math.random() * 100; + if (channelOptions['grpc.service_config_disable_resolution'] === 1) { + this.isServiceConfigEnabled = false; + } + this.defaultResolutionError = { code: Status.UNAVAILABLE, details: `Name resolution failed for target ${uriToString(this.target)}`, @@ -255,7 +260,7 @@ class DnsResolver implements Resolver { ); /* If there already is a still-pending TXT resolution, we can just use * that result when it comes in */ - if (this.pendingTxtPromise === null) { + if (this.isServiceConfigEnabled && this.pendingTxtPromise === null) { /* We handle the TXT query promise differently than the others because * the name resolution attempt as a whole is a success even if the TXT * lookup fails */ diff --git a/packages/grpc-js/src/resolving-call.ts b/packages/grpc-js/src/resolving-call.ts index fe29a4f7..f29fb7fd 100644 --- a/packages/grpc-js/src/resolving-call.ts +++ b/packages/grpc-js/src/resolving-call.ts @@ -18,7 +18,7 @@ import { CallCredentials } from "./call-credentials"; import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from "./call-interface"; import { LogVerbosity, Propagate, Status } from "./constants"; -import { Deadline, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline"; +import { Deadline, deadlineToString, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline"; import { FilterStack, FilterStackFactory } from "./filter-stack"; import { InternalChannel } from "./internal-channel"; import { Metadata } from "./metadata"; @@ -79,9 +79,9 @@ export class ResolvingCall implements Call { private runDeadlineTimer() { clearTimeout(this.deadlineTimer); - this.trace('Deadline: ' + this.deadline); - if (this.deadline !== Infinity) { - const timeout = getRelativeTimeout(this.deadline); + this.trace('Deadline: ' + deadlineToString(this.deadline)); + const timeout = getRelativeTimeout(this.deadline); + if (timeout !== Infinity) { this.trace('Deadline will be reached in ' + timeout + 'ms'); const handleDeadline = () => { this.cancelWithStatus( @@ -103,6 +103,7 @@ export class ResolvingCall implements Call { if (!this.filterStack) { this.filterStack = this.filterStackFactory.createFilter(); } + clearTimeout(this.deadlineTimer); const filteredStatus = this.filterStack.receiveTrailers(status); this.trace('ended with status: code=' + filteredStatus.code + ' details="' + filteredStatus.details + '"'); this.statusWatchers.forEach(watcher => watcher(filteredStatus)); @@ -177,13 +178,17 @@ export class ResolvingCall implements Call { this.filterStack = this.filterStackFactory.createFilter(); this.filterStack.sendMetadata(Promise.resolve(this.metadata)).then(filteredMetadata => { this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline); + this.trace('Created child [' + this.child.getCallNumber() + ']') this.child.start(filteredMetadata, { onReceiveMetadata: metadata => { + this.trace('Received metadata') this.listener!.onReceiveMetadata(this.filterStack!.receiveMetadata(metadata)); }, onReceiveMessage: message => { + this.trace('Received message'); this.readFilterPending = true; this.filterStack!.receiveMessage(message).then(filteredMesssage => { + this.trace('Finished filtering received message'); this.readFilterPending = false; this.listener!.onReceiveMessage(filteredMesssage); if (this.pendingChildStatus) { @@ -194,6 +199,7 @@ export class ResolvingCall implements Call { }); }, onReceiveStatus: status => { + this.trace('Received status'); if (this.readFilterPending) { this.pendingChildStatus = status; } else { diff --git a/packages/grpc-js/src/retrying-call.ts b/packages/grpc-js/src/retrying-call.ts index f9bda9eb..5ae585b9 100644 --- a/packages/grpc-js/src/retrying-call.ts +++ b/packages/grpc-js/src/retrying-call.ts @@ -151,6 +151,12 @@ export class RetryingCall implements Call { private initialMetadata: Metadata | null = null; private underlyingCalls: UnderlyingCall[] = []; private writeBuffer: WriteBufferEntry[] = []; + /** + * The offset of message indices in the writeBuffer. For example, if + * writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15 + * is in writeBuffer[5]. + */ + private writeBufferOffset = 0; /** * Tracks whether a read has been started, so that we know whether to start * reads on new child calls. This only matters for the first read, because @@ -202,8 +208,16 @@ export class RetryingCall implements Call { private reportStatus(statusObject: StatusObject) { this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"'); + this.bufferTracker.freeAll(this.callNumber); + this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length; + this.writeBuffer = []; process.nextTick(() => { - this.listener?.onReceiveStatus(statusObject); + // Explicitly construct status object to remove progress field + this.listener?.onReceiveStatus({ + code: statusObject.code, + details: statusObject.details, + metadata: statusObject.metadata + }); }); } @@ -222,20 +236,27 @@ export class RetryingCall implements Call { } } - private maybefreeMessageBufferEntry(messageIndex: number) { + private getBufferEntry(messageIndex: number): WriteBufferEntry { + return this.writeBuffer[messageIndex - this.writeBufferOffset] ?? {entryType: 'FREED', allocated: false}; + } + + private getNextBufferIndex() { + return this.writeBufferOffset + this.writeBuffer.length; + } + + private clearSentMessages() { if (this.state !== 'COMMITTED') { return; } - const bufferEntry = this.writeBuffer[messageIndex]; - if (bufferEntry.entryType === 'MESSAGE') { + const earliestNeededMessageIndex = this.underlyingCalls[this.committedCallIndex!].nextMessageToSend; + for (let messageIndex = this.writeBufferOffset; messageIndex < earliestNeededMessageIndex; messageIndex++) { + const bufferEntry = this.getBufferEntry(messageIndex); if (bufferEntry.allocated) { this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber); } - this.writeBuffer[messageIndex] = { - entryType: 'FREED', - allocated: false - }; } + this.writeBuffer = this.writeBuffer.slice(earliestNeededMessageIndex - this.writeBufferOffset); + this.writeBufferOffset = earliestNeededMessageIndex; } private commitCall(index: number) { @@ -258,21 +279,28 @@ export class RetryingCall implements Call { this.underlyingCalls[i].state = 'COMPLETED'; this.underlyingCalls[i].call.cancelWithStatus(Status.CANCELLED, 'Discarded in favor of other hedged attempt'); } - for (let messageIndex = 0; messageIndex < this.underlyingCalls[index].nextMessageToSend - 1; messageIndex += 1) { - this.maybefreeMessageBufferEntry(messageIndex); - } + this.clearSentMessages(); } private commitCallWithMostMessages() { + if (this.state === 'COMMITTED') { + return; + } let mostMessages = -1; let callWithMostMessages = -1; for (const [index, childCall] of this.underlyingCalls.entries()) { - if (childCall.nextMessageToSend > mostMessages) { + if (childCall.state === 'ACTIVE' && childCall.nextMessageToSend > mostMessages) { mostMessages = childCall.nextMessageToSend; callWithMostMessages = index; } } - this.commitCall(callWithMostMessages); + if (callWithMostMessages === -1) { + /* There are no active calls, disable retries to force the next call that + * is started to be committed. */ + this.state = 'TRANSPARENT_ONLY'; + } else { + this.commitCall(callWithMostMessages); + } } private isStatusCodeInList(list: (Status | string)[], code: Status) { @@ -532,8 +560,8 @@ export class RetryingCall implements Call { private handleChildWriteCompleted(childIndex: number) { const childCall = this.underlyingCalls[childIndex]; const messageIndex = childCall.nextMessageToSend; - this.writeBuffer[messageIndex].callback?.(); - this.maybefreeMessageBufferEntry(messageIndex); + this.getBufferEntry(messageIndex).callback?.(); + this.clearSentMessages(); childCall.nextMessageToSend += 1; this.sendNextChildMessage(childIndex); } @@ -543,10 +571,10 @@ export class RetryingCall implements Call { if (childCall.state === 'COMPLETED') { return; } - if (this.writeBuffer[childCall.nextMessageToSend]) { - const bufferEntry = this.writeBuffer[childCall.nextMessageToSend]; + if (this.getBufferEntry(childCall.nextMessageToSend)) { + const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend); switch (bufferEntry.entryType) { - case 'MESSAGE': + case 'MESSAGE': childCall.call.sendMessageWithContext({ callback: (error) => { // Ignore error @@ -571,13 +599,13 @@ export class RetryingCall implements Call { message, flags: context.flags, }; - const messageIndex = this.writeBuffer.length; + const messageIndex = this.getNextBufferIndex(); const bufferEntry: WriteBufferEntry = { entryType: 'MESSAGE', message: writeObj, allocated: this.bufferTracker.allocate(message.length, this.callNumber) }; - this.writeBuffer[messageIndex] = bufferEntry; + this.writeBuffer.push(bufferEntry); if (bufferEntry.allocated) { context.callback?.(); for (const [callIndex, call] of this.underlyingCalls.entries()) { @@ -592,7 +620,11 @@ export class RetryingCall implements Call { } } else { this.commitCallWithMostMessages(); - const call = this.underlyingCalls[this.committedCallIndex!]; + // commitCallWithMostMessages can fail if we are between ping attempts + if (this.committedCallIndex === null) { + return; + } + const call = this.underlyingCalls[this.committedCallIndex]; bufferEntry.callback = context.callback; if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) { call.call.sendMessageWithContext({ @@ -615,11 +647,11 @@ export class RetryingCall implements Call { } halfClose(): void { this.trace('halfClose called'); - const halfCloseIndex = this.writeBuffer.length; - this.writeBuffer[halfCloseIndex] = { + const halfCloseIndex = this.getNextBufferIndex(); + this.writeBuffer.push({ entryType: 'HALF_CLOSE', allocated: false - }; + }); for (const call of this.underlyingCalls) { if (call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) { call.nextMessageToSend += 1; diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index ba129217..48186bc2 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -63,11 +63,13 @@ const deadlineUnitsToMs: DeadlineUnitIndexSignature = { u: 0.001, n: 0.000001, }; -const defaultResponseHeaders = { +const defaultCompressionHeaders = { // TODO(cjihrig): Remove these encoding headers from the default response // once compression is integrated. [GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip', [GRPC_ENCODING_HEADER]: 'identity', +} +const defaultResponseHeaders = { [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK, [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto', }; @@ -500,7 +502,7 @@ export class Http2ServerCallStream< this.metadataSent = true; const custom = customMetadata ? customMetadata.toHttp2Headers() : null; // TODO(cjihrig): Include compression headers. - const headers = { ...defaultResponseHeaders, ...custom }; + const headers = { ...defaultResponseHeaders, ...defaultCompressionHeaders, ...custom }; this.stream.respond(headers, defaultResponseOptions); } @@ -725,9 +727,11 @@ export class Http2ServerCallStream< this.stream.end(); } } else { + // Trailers-only response const trailersToSend = { [GRPC_STATUS_HEADER]: statusObj.code, [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details), + ...defaultResponseHeaders, ...statusObj.metadata?.toHttp2Headers(), }; this.stream.respond(trailersToSend, {endStream: true}); diff --git a/packages/grpc-js/src/subchannel-address.ts b/packages/grpc-js/src/subchannel-address.ts index 29022caa..e542e645 100644 --- a/packages/grpc-js/src/subchannel-address.ts +++ b/packages/grpc-js/src/subchannel-address.ts @@ -41,9 +41,15 @@ export function isTcpSubchannelAddress( } export function subchannelAddressEqual( - address1: SubchannelAddress, - address2: SubchannelAddress + address1?: SubchannelAddress, + address2?: SubchannelAddress ): boolean { + if (!address1 && !address2) { + return true; + } + if (!address1 || !address2) { + return false; + } if (isTcpSubchannelAddress(address1)) { return ( isTcpSubchannelAddress(address2) && diff --git a/packages/grpc-js/src/subchannel-call.ts b/packages/grpc-js/src/subchannel-call.ts index 2bc6fb0c..969282e1 100644 --- a/packages/grpc-js/src/subchannel-call.ts +++ b/packages/grpc-js/src/subchannel-call.ts @@ -21,12 +21,12 @@ import * as os from 'os'; import { Status } from './constants'; import { Metadata } from './metadata'; import { StreamDecoder } from './stream-decoder'; -import { SubchannelCallStatsTracker, Subchannel } from './subchannel'; import * as logging from './logging'; import { LogVerbosity } from './constants'; import { ServerSurfaceCall } from './server-call'; import { Deadline } from './deadline'; import { InterceptingListener, MessageContext, StatusObject, WriteCallback } from './call-interface'; +import { CallEventTracker, Transport } from './transport'; const TRACER_NAME = 'subchannel_call'; @@ -87,6 +87,7 @@ export class Http2SubchannelCall implements SubchannelCall { private decoder = new StreamDecoder(); private isReadFilterPending = false; + private isPushPending = false; private canPush = false; /** * Indicates that an 'end' event has come from the http2 stream, so there @@ -104,26 +105,15 @@ export class Http2SubchannelCall implements SubchannelCall { // This is populated (non-null) if and only if the call has ended private finalStatus: StatusObject | null = null; - private disconnectListener: () => void; - private internalError: SystemError | null = null; constructor( private readonly http2Stream: http2.ClientHttp2Stream, - private readonly callStatsTracker: SubchannelCallStatsTracker, + private readonly callEventTracker: CallEventTracker, private readonly listener: SubchannelCallInterceptingListener, - private readonly subchannel: Subchannel, + private readonly transport: Transport, private readonly callId: number ) { - this.disconnectListener = () => { - this.endCall({ - code: Status.UNAVAILABLE, - details: 'Connection dropped', - metadata: new Metadata(), - }); - }; - subchannel.addDisconnectListener(this.disconnectListener); - subchannel.callRef(); http2Stream.on('response', (headers, flags) => { let headersString = ''; for (const header of Object.keys(headers)) { @@ -185,7 +175,7 @@ export class Http2SubchannelCall implements SubchannelCall { for (const message of messages) { this.trace('parsed message of length ' + message.length); - this.callStatsTracker!.addMessageReceived(); + this.callEventTracker!.addMessageReceived(); this.tryPush(message); } }); @@ -289,7 +279,15 @@ export class Http2SubchannelCall implements SubchannelCall { ); this.internalError = err; } - this.callStatsTracker.onStreamEnd(false); + this.callEventTracker.onStreamEnd(false); + }); + } + + public onDisconnect() { + this.endCall({ + code: Status.UNAVAILABLE, + details: 'Connection dropped', + metadata: new Metadata(), }); } @@ -304,7 +302,7 @@ export class Http2SubchannelCall implements SubchannelCall { this.finalStatus!.details + '"' ); - this.callStatsTracker.onCallEnd(this.finalStatus!); + this.callEventTracker.onCallEnd(this.finalStatus!); /* We delay the actual action of bubbling up the status to insulate the * cleanup code in this class from any errors that may be thrown in the * upper layers as a result of bubbling up the status. In particular, @@ -319,8 +317,6 @@ export class Http2SubchannelCall implements SubchannelCall { * not push more messages after the status is output, so the messages go * nowhere either way. */ this.http2Stream.resume(); - this.subchannel.callUnref(); - this.subchannel.removeDisconnectListener(this.disconnectListener); } } @@ -356,7 +352,8 @@ export class Http2SubchannelCall implements SubchannelCall { this.finalStatus.code !== Status.OK || (this.readsClosed && this.unpushedReadMessages.length === 0 && - !this.isReadFilterPending) + !this.isReadFilterPending && + !this.isPushPending) ) { this.outputStatus(); } @@ -369,7 +366,9 @@ export class Http2SubchannelCall implements SubchannelCall { (message instanceof Buffer ? message.length : null) ); this.canPush = false; + this.isPushPending = true; process.nextTick(() => { + this.isPushPending = false; /* If we have already output the status any later messages should be * ignored, and can cause out-of-order operation errors higher up in the * stack. Checking as late as possible here to avoid any race conditions. @@ -395,7 +394,7 @@ export class Http2SubchannelCall implements SubchannelCall { } private handleTrailers(headers: http2.IncomingHttpHeaders) { - this.callStatsTracker.onStreamEnd(true); + this.callEventTracker.onStreamEnd(true); let headersString = ''; for (const header of Object.keys(headers)) { headersString += '\t\t' + header + ': ' + headers[header] + '\n'; @@ -467,7 +466,7 @@ export class Http2SubchannelCall implements SubchannelCall { } getPeer(): string { - return this.subchannel.getAddress(); + return this.transport.getPeerName(); } getCallNumber(): number { @@ -506,7 +505,7 @@ export class Http2SubchannelCall implements SubchannelCall { context.callback?.(); }; this.trace('sending data chunk of length ' + message.length); - this.callStatsTracker.addMessageSent(); + this.callEventTracker.addMessageSent(); try { this.http2Stream!.write(message, cb); } catch (error) { diff --git a/packages/grpc-js/src/subchannel-interface.ts b/packages/grpc-js/src/subchannel-interface.ts index 082a8b3c..165ebc3e 100644 --- a/packages/grpc-js/src/subchannel-interface.ts +++ b/packages/grpc-js/src/subchannel-interface.ts @@ -22,7 +22,8 @@ import { Subchannel } from "./subchannel"; export type ConnectivityStateListener = ( subchannel: SubchannelInterface, previousState: ConnectivityState, - newState: ConnectivityState + newState: ConnectivityState, + keepaliveTime: number ) => void; /** @@ -40,6 +41,7 @@ export interface SubchannelInterface { removeConnectivityStateListener(listener: ConnectivityStateListener): void; startConnecting(): void; getAddress(): string; + throttleKeepalive(newKeepaliveTime: number): void; ref(): void; unref(): void; getChannelzRef(): SubchannelRef; @@ -67,6 +69,9 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface { getAddress(): string { return this.child.getAddress(); } + throttleKeepalive(newKeepaliveTime: number): void { + this.child.throttleKeepalive(newKeepaliveTime); + } ref(): void { this.child.ref(); } diff --git a/packages/grpc-js/src/subchannel-pool.ts b/packages/grpc-js/src/subchannel-pool.ts index b7ef362c..bbfbea02 100644 --- a/packages/grpc-js/src/subchannel-pool.ts +++ b/packages/grpc-js/src/subchannel-pool.ts @@ -23,6 +23,7 @@ import { } from './subchannel-address'; import { ChannelCredentials } from './channel-credentials'; import { GrpcUri, uriToString } from './uri-parser'; +import { Http2SubchannelConnector } from './transport'; // 10 seconds in milliseconds. This value is arbitrary. /** @@ -143,7 +144,8 @@ export class SubchannelPool { channelTargetUri, subchannelTarget, channelArguments, - channelCredentials + channelCredentials, + new Http2SubchannelConnector(channelTargetUri) ); if (!(channelTarget in this.pool)) { this.pool[channelTarget] = []; diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 0d9773b3..c93e0c45 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -15,60 +15,30 @@ * */ -import * as http2 from 'http2'; import { ChannelCredentials } from './channel-credentials'; import { Metadata } from './metadata'; import { ChannelOptions } from './channel-options'; -import { PeerCertificate, checkServerIdentity, TLSSocket, CipherNameAndProtocol } from 'tls'; import { ConnectivityState } from './connectivity-state'; import { BackoffTimeout, BackoffOptions } from './backoff-timeout'; -import { getDefaultAuthority } from './resolver'; import * as logging from './logging'; import { LogVerbosity, Status } from './constants'; -import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; -import * as net from 'net'; -import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser'; -import { ConnectionOptions } from 'tls'; +import { GrpcUri, uriToString } from './uri-parser'; import { - stringToSubchannelAddress, SubchannelAddress, subchannelAddressToString, } from './subchannel-address'; -import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz'; +import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, unregisterChannelzRef } from './channelz'; import { ConnectivityStateListener } from './subchannel-interface'; -import { Http2SubchannelCall, SubchannelCallInterceptingListener } from './subchannel-call'; -import { getNextCallNumber } from './call-number'; +import { SubchannelCallInterceptingListener } from './subchannel-call'; import { SubchannelCall } from './subchannel-call'; -import { InterceptingListener, StatusObject } from './call-interface'; - -const clientVersion = require('../../package.json').version; +import { CallEventTracker, SubchannelConnector, Transport } from './transport'; const TRACER_NAME = 'subchannel'; -const FLOW_CONTROL_TRACER_NAME = 'subchannel_flowctrl'; /* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't * have a constant for the max signed 32 bit integer, so this is a simple way * to calculate it */ const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); -const KEEPALIVE_TIMEOUT_MS = 20000; - -export interface SubchannelCallStatsTracker { - addMessageSent(): void; - addMessageReceived(): void; - onCallEnd(status: StatusObject): void; - onStreamEnd(success: boolean): void; -} - -const { - HTTP2_HEADER_AUTHORITY, - HTTP2_HEADER_CONTENT_TYPE, - HTTP2_HEADER_METHOD, - HTTP2_HEADER_PATH, - HTTP2_HEADER_TE, - HTTP2_HEADER_USER_AGENT, -} = http2.constants; - -const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii'); export class Subchannel { /** @@ -79,7 +49,7 @@ export class Subchannel { /** * The underlying http2 session used to make requests. */ - private session: http2.ClientHttp2Session | null = null; + private transport: Transport | null = null; /** * Indicates that the subchannel should transition from TRANSIENT_FAILURE to * CONNECTING instead of IDLE when the backoff timeout ends. @@ -92,45 +62,9 @@ export class Subchannel { */ private stateListeners: ConnectivityStateListener[] = []; - /** - * A list of listener functions that will be called when the underlying - * socket disconnects. Used for ending active calls with an UNAVAILABLE - * status. - */ - private disconnectListeners: Set<() => void> = new Set(); - private backoffTimeout: BackoffTimeout; - /** - * The complete user agent string constructed using channel args. - */ - private userAgent: string; - - /** - * The amount of time in between sending pings - */ - private keepaliveTimeMs: number = KEEPALIVE_MAX_TIME_MS; - /** - * The amount of time to wait for an acknowledgement after sending a ping - */ - private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS; - /** - * Timer reference for timeout that indicates when to send the next ping - */ - private keepaliveIntervalId: NodeJS.Timer; - /** - * Timer reference tracking when the most recent ping will be considered lost - */ - private keepaliveTimeoutId: NodeJS.Timer; - /** - * Indicates whether keepalive pings should be sent without any active calls - */ - private keepaliveWithoutCalls = false; - - /** - * Tracks calls with references to this subchannel - */ - private callRefcount = 0; + private keepaliveTime: number; /** * Tracks channels and subchannel pools with references to this subchannel */ @@ -149,18 +83,7 @@ export class Subchannel { private childrenTracker = new ChannelzChildrenTracker(); // Channelz socket info - private channelzSocketRef: SocketRef | null = null; - /** - * Name of the remote server, if it is not the same as the subchannel - * address, i.e. if connecting through an HTTP CONNECT proxy. - */ - private remoteName: string | null = null; private streamTracker = new ChannelzCallTracker(); - private keepalivesSent = 0; - private messagesSent = 0; - private messagesReceived = 0; - private lastMessageSentTimestamp: Date | null = null; - private lastMessageReceivedTimestamp: Date | null = null; /** * A class representing a connection to a single backend. @@ -176,33 +99,9 @@ export class Subchannel { private channelTarget: GrpcUri, private subchannelAddress: SubchannelAddress, private options: ChannelOptions, - private credentials: ChannelCredentials + private credentials: ChannelCredentials, + private connector: SubchannelConnector ) { - // Build user-agent string. - this.userAgent = [ - options['grpc.primary_user_agent'], - `grpc-node-js/${clientVersion}`, - options['grpc.secondary_user_agent'], - ] - .filter((e) => e) - .join(' '); // remove falsey values first - - if ('grpc.keepalive_time_ms' in options) { - this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!; - } - if ('grpc.keepalive_timeout_ms' in options) { - this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!; - } - if ('grpc.keepalive_permit_without_calls' in options) { - this.keepaliveWithoutCalls = - options['grpc.keepalive_permit_without_calls'] === 1; - } else { - this.keepaliveWithoutCalls = false; - } - this.keepaliveIntervalId = setTimeout(() => {}, 0); - clearTimeout(this.keepaliveIntervalId); - this.keepaliveTimeoutId = setTimeout(() => {}, 0); - clearTimeout(this.keepaliveTimeoutId); const backoffOptions: BackoffOptions = { initialDelay: options['grpc.initial_reconnect_backoff_ms'], maxDelay: options['grpc.max_reconnect_backoff_ms'], @@ -212,6 +111,8 @@ export class Subchannel { }, backoffOptions); this.subchannelAddressString = subchannelAddressToString(subchannelAddress); + this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1; + if (options['grpc.enable_channelz'] === 0) { this.channelzEnabled = false; } @@ -233,67 +134,6 @@ export class Subchannel { }; } - private getChannelzSocketInfo(): SocketInfo | null { - if (this.session === null) { - return null; - } - const sessionSocket = this.session.socket; - const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null; - const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null; - let tlsInfo: TlsInfo | null; - if (this.session.encrypted) { - const tlsSocket: TLSSocket = sessionSocket as TLSSocket; - const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher(); - const certificate = tlsSocket.getCertificate(); - const peerCertificate = tlsSocket.getPeerCertificate(); - tlsInfo = { - cipherSuiteStandardName: cipherInfo.standardName ?? null, - cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name, - localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null, - remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null - }; - } else { - tlsInfo = null; - } - const socketInfo: SocketInfo = { - remoteAddress: remoteAddress, - localAddress: localAddress, - security: tlsInfo, - remoteName: this.remoteName, - streamsStarted: this.streamTracker.callsStarted, - streamsSucceeded: this.streamTracker.callsSucceeded, - streamsFailed: this.streamTracker.callsFailed, - messagesSent: this.messagesSent, - messagesReceived: this.messagesReceived, - keepAlivesSent: this.keepalivesSent, - lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp, - lastRemoteStreamCreatedTimestamp: null, - lastMessageSentTimestamp: this.lastMessageSentTimestamp, - lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp, - localFlowControlWindow: this.session.state.localWindowSize ?? null, - remoteFlowControlWindow: this.session.state.remoteWindowSize ?? null - }; - return socketInfo; - } - - private resetChannelzSocketInfo() { - if (!this.channelzEnabled) { - return; - } - if (this.channelzSocketRef) { - unregisterChannelzRef(this.channelzSocketRef); - this.childrenTracker.unrefChild(this.channelzSocketRef); - this.channelzSocketRef = null; - } - this.remoteName = null; - this.streamTracker = new ChannelzCallTracker(); - this.keepalivesSent = 0; - this.messagesSent = 0; - this.messagesReceived = 0; - this.lastMessageSentTimestamp = null; - this.lastMessageReceivedTimestamp = null; - } - private trace(text: string): void { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); } @@ -302,18 +142,6 @@ export class Subchannel { logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); } - private flowControlTrace(text: string): void { - logging.trace(LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); - } - - private internalsTrace(text: string): void { - logging.trace(LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); - } - - private keepaliveTrace(text: string): void { - logging.trace(LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); - } - private handleBackoffTimer() { if (this.continueConnecting) { this.transitionToState( @@ -340,313 +168,39 @@ export class Subchannel { this.backoffTimeout.reset(); } - private sendPing() { - if (this.channelzEnabled) { - this.keepalivesSent += 1; - } - this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms'); - this.keepaliveTimeoutId = setTimeout(() => { - this.keepaliveTrace('Ping timeout passed without response'); - this.handleDisconnect(); - }, this.keepaliveTimeoutMs); - this.keepaliveTimeoutId.unref?.(); - try { - this.session!.ping( - (err: Error | null, duration: number, payload: Buffer) => { - this.keepaliveTrace('Received ping response'); - clearTimeout(this.keepaliveTimeoutId); - } - ); - } catch (e) { - /* If we fail to send a ping, the connection is no longer functional, so - * we should discard it. */ - this.transitionToState( - [ConnectivityState.READY], - ConnectivityState.TRANSIENT_FAILURE - ); - } - } - - private startKeepalivePings() { - this.keepaliveIntervalId = setInterval(() => { - this.sendPing(); - }, this.keepaliveTimeMs); - this.keepaliveIntervalId.unref?.(); - /* Don't send a ping immediately because whatever caused us to start - * sending pings should also involve some network activity. */ - } - - /** - * Stop keepalive pings when terminating a connection. This discards the - * outstanding ping timeout, so it should not be called if the same - * connection will still be used. - */ - private stopKeepalivePings() { - clearInterval(this.keepaliveIntervalId); - clearTimeout(this.keepaliveTimeoutId); - } - - private createSession(proxyConnectionResult: ProxyConnectionResult) { - if (proxyConnectionResult.realTarget) { - this.remoteName = uriToString(proxyConnectionResult.realTarget); - this.trace('creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget); - } else { - this.remoteName = null; - this.trace('creating HTTP/2 session'); - } - const targetAuthority = getDefaultAuthority( - proxyConnectionResult.realTarget ?? this.channelTarget - ); - let connectionOptions: http2.SecureClientSessionOptions = - this.credentials._getConnectionOptions() || {}; - connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER; - if ('grpc-node.max_session_memory' in this.options) { - connectionOptions.maxSessionMemory = this.options[ - 'grpc-node.max_session_memory' - ]; - } else { - /* By default, set a very large max session memory limit, to effectively - * disable enforcement of the limit. Some testing indicates that Node's - * behavior degrades badly when this limit is reached, so we solve that - * by disabling the check entirely. */ - connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER; - } - let addressScheme = 'http://'; - if ('secureContext' in connectionOptions) { - addressScheme = 'https://'; - // If provided, the value of grpc.ssl_target_name_override should be used - // to override the target hostname when checking server identity. - // This option is used for testing only. - if (this.options['grpc.ssl_target_name_override']) { - const sslTargetNameOverride = this.options[ - 'grpc.ssl_target_name_override' - ]!; - connectionOptions.checkServerIdentity = ( - host: string, - cert: PeerCertificate - ): Error | undefined => { - return checkServerIdentity(sslTargetNameOverride, cert); - }; - connectionOptions.servername = sslTargetNameOverride; - } else { - const authorityHostname = - splitHostPort(targetAuthority)?.host ?? 'localhost'; - // We want to always set servername to support SNI - connectionOptions.servername = authorityHostname; - } - if (proxyConnectionResult.socket) { - /* This is part of the workaround for - * https://github.com/nodejs/node/issues/32922. Without that bug, - * proxyConnectionResult.socket would always be a plaintext socket and - * this would say - * connectionOptions.socket = proxyConnectionResult.socket; */ - connectionOptions.createConnection = (authority, option) => { - return proxyConnectionResult.socket!; - }; - } - } else { - /* In all but the most recent versions of Node, http2.connect does not use - * the options when establishing plaintext connections, so we need to - * establish that connection explicitly. */ - connectionOptions.createConnection = (authority, option) => { - if (proxyConnectionResult.socket) { - return proxyConnectionResult.socket; - } else { - /* net.NetConnectOpts is declared in a way that is more restrictive - * than what net.connect will actually accept, so we use the type - * assertion to work around that. */ - return net.connect(this.subchannelAddress); - } - }; - } - - connectionOptions = { - ...connectionOptions, - ...this.subchannelAddress, - }; - - /* http2.connect uses the options here: - * https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036 - * The spread operator overides earlier values with later ones, so any port - * or host values in the options will be used rather than any values extracted - * from the first argument. In addition, the path overrides the host and port, - * as documented for plaintext connections here: - * https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener - * and for TLS connections here: - * https://nodejs.org/api/tls.html#tls_tls_connect_options_callback. In - * earlier versions of Node, http2.connect passes these options to - * tls.connect but not net.connect, so in the insecure case we still need - * to set the createConnection option above to create the connection - * explicitly. We cannot do that in the TLS case because http2.connect - * passes necessary additional options to tls.connect. - * The first argument just needs to be parseable as a URL and the scheme - * determines whether the connection will be established over TLS or not. - */ - const session = http2.connect( - addressScheme + targetAuthority, - connectionOptions - ); - this.session = session; - this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!, this.channelzEnabled); - if (this.channelzEnabled) { - this.childrenTracker.refChild(this.channelzSocketRef); - } - session.unref(); - /* For all of these events, check if the session at the time of the event - * is the same one currently attached to this subchannel, to ensure that - * old events from previous connection attempts cannot cause invalid state - * transitions. */ - session.once('connect', () => { - if (this.session === session) { - this.transitionToState( - [ConnectivityState.CONNECTING], - ConnectivityState.READY - ); - } - }); - session.once('close', () => { - if (this.session === session) { - this.trace('connection closed'); - this.transitionToState( - [ConnectivityState.CONNECTING], - ConnectivityState.TRANSIENT_FAILURE - ); - /* Transitioning directly to IDLE here should be OK because we are not - * doing any backoff, because a connection was established at some - * point */ - this.transitionToState( - [ConnectivityState.READY], - ConnectivityState.IDLE - ); - } - }); - session.once( - 'goaway', - (errorCode: number, lastStreamID: number, opaqueData: Buffer) => { - if (this.session === session) { - /* See the last paragraph of - * https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */ - if ( - errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM && - opaqueData.equals(tooManyPingsData) - ) { - this.keepaliveTimeMs = Math.min( - 2 * this.keepaliveTimeMs, - KEEPALIVE_MAX_TIME_MS - ); - logging.log( - LogVerbosity.ERROR, - `Connection to ${uriToString(this.channelTarget)} at ${ - this.subchannelAddressString - } rejected by server because of excess pings. Increasing ping interval to ${ - this.keepaliveTimeMs - } ms` - ); - } - this.trace( - 'connection closed by GOAWAY with code ' + - errorCode - ); - this.transitionToState( - [ConnectivityState.CONNECTING, ConnectivityState.READY], - ConnectivityState.IDLE - ); - } - } - ); - session.once('error', (error) => { - /* Do nothing here. Any error should also trigger a close event, which is - * where we want to handle that. */ - this.trace( - 'connection closed with error ' + - (error as Error).message - ); - }); - if (logging.isTracerEnabled(TRACER_NAME)) { - session.on('remoteSettings', (settings: http2.Settings) => { - this.trace( - 'new settings received' + - (this.session !== session ? ' on the old connection' : '') + - ': ' + - JSON.stringify(settings) - ); - }); - session.on('localSettings', (settings: http2.Settings) => { - this.trace( - 'local settings acknowledged by remote' + - (this.session !== session ? ' on the old connection' : '') + - ': ' + - JSON.stringify(settings) - ); - }); - } - } - private startConnectingInternal() { - /* Pass connection options through to the proxy so that it's able to - * upgrade it's connection to support tls if needed. - * This is a workaround for https://github.com/nodejs/node/issues/32922 - * See https://github.com/grpc/grpc-node/pull/1369 for more info. */ - const connectionOptions: ConnectionOptions = - this.credentials._getConnectionOptions() || {}; - - if ('secureContext' in connectionOptions) { - connectionOptions.ALPNProtocols = ['h2']; - // If provided, the value of grpc.ssl_target_name_override should be used - // to override the target hostname when checking server identity. - // This option is used for testing only. - if (this.options['grpc.ssl_target_name_override']) { - const sslTargetNameOverride = this.options[ - 'grpc.ssl_target_name_override' - ]!; - connectionOptions.checkServerIdentity = ( - host: string, - cert: PeerCertificate - ): Error | undefined => { - return checkServerIdentity(sslTargetNameOverride, cert); - }; - connectionOptions.servername = sslTargetNameOverride; - } else { - if ('grpc.http_connect_target' in this.options) { - /* This is more or less how servername will be set in createSession - * if a connection is successfully established through the proxy. - * If the proxy is not used, these connectionOptions are discarded - * anyway */ - const targetPath = getDefaultAuthority( - parseUri(this.options['grpc.http_connect_target'] as string) ?? { - path: 'localhost', + let options = this.options; + if (options['grpc.keepalive_time_ms']) { + const adjustedKeepaliveTime = Math.min(this.keepaliveTime, KEEPALIVE_MAX_TIME_MS); + options = {...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime}; + } + this.connector.connect(this.subchannelAddress, this.credentials, options).then( + transport => { + if (this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.READY)) { + this.transport = transport; + if (this.channelzEnabled) { + this.childrenTracker.refChild(transport.getChannelzRef()); + } + transport.addDisconnectListener((tooManyPings) => { + this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE); + if (tooManyPings && this.keepaliveTime > 0) { + this.keepaliveTime *= 2; + logging.log( + LogVerbosity.ERROR, + `Connection to ${uriToString(this.channelTarget)} at ${ + this.subchannelAddressString + } rejected by server because of excess pings. Increasing ping interval to ${ + this.keepaliveTime + } ms` + ); } - ); - const hostPort = splitHostPort(targetPath); - connectionOptions.servername = hostPort?.host ?? targetPath; + }); } - } - } - - getProxiedConnection( - this.subchannelAddress, - this.options, - connectionOptions - ).then( - (result) => { - this.createSession(result); }, - (reason) => { - this.transitionToState( - [ConnectivityState.CONNECTING], - ConnectivityState.TRANSIENT_FAILURE - ); + error => { + this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.TRANSIENT_FAILURE); } - ); - } - - private handleDisconnect() { - this.transitionToState( - [ConnectivityState.READY], - ConnectivityState.TRANSIENT_FAILURE); - for (const listener of this.disconnectListeners.values()) { - listener(); - } + ) } /** @@ -676,15 +230,6 @@ export class Subchannel { switch (newState) { case ConnectivityState.READY: this.stopBackoff(); - const session = this.session!; - session.socket.once('close', () => { - if (this.session === session) { - this.handleDisconnect(); - } - }); - if (this.keepaliveWithoutCalls) { - this.startKeepalivePings(); - } break; case ConnectivityState.CONNECTING: this.startBackoff(); @@ -692,12 +237,11 @@ export class Subchannel { this.continueConnecting = false; break; case ConnectivityState.TRANSIENT_FAILURE: - if (this.session) { - this.session.close(); + if (this.channelzEnabled && this.transport) { + this.childrenTracker.unrefChild(this.transport.getChannelzRef()); } - this.session = null; - this.resetChannelzSocketInfo(); - this.stopKeepalivePings(); + this.transport?.shutdown(); + this.transport = null; /* If the backoff timer has already ended by the time we get to the * TRANSIENT_FAILURE state, we want to immediately transition out of * TRANSIENT_FAILURE as though the backoff timer is ending right now */ @@ -708,12 +252,11 @@ export class Subchannel { } break; case ConnectivityState.IDLE: - if (this.session) { - this.session.close(); + if (this.channelzEnabled && this.transport) { + this.childrenTracker.unrefChild(this.transport.getChannelzRef()); } - this.session = null; - this.resetChannelzSocketInfo(); - this.stopKeepalivePings(); + this.transport?.shutdown(); + this.transport = null; break; default: throw new Error(`Invalid state: unknown ConnectivityState ${newState}`); @@ -721,71 +264,11 @@ export class Subchannel { /* We use a shallow copy of the stateListeners array in case a listener * is removed during this iteration */ for (const listener of [...this.stateListeners]) { - listener(this, previousState, newState); + listener(this, previousState, newState, this.keepaliveTime); } return true; } - /** - * Check if the subchannel associated with zero calls and with zero channels. - * If so, shut it down. - */ - private checkBothRefcounts() { - /* If no calls, channels, or subchannel pools have any more references to - * this subchannel, we can be sure it will never be used again. */ - if (this.callRefcount === 0 && this.refcount === 0) { - if (this.channelzEnabled) { - this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); - } - this.transitionToState( - [ConnectivityState.CONNECTING, ConnectivityState.READY], - ConnectivityState.IDLE - ); - if (this.channelzEnabled) { - unregisterChannelzRef(this.channelzRef); - } - } - } - - callRef() { - this.refTrace( - 'callRefcount ' + - this.callRefcount + - ' -> ' + - (this.callRefcount + 1) - ); - if (this.callRefcount === 0) { - if (this.session) { - this.session.ref(); - } - this.backoffTimeout.ref(); - if (!this.keepaliveWithoutCalls) { - this.startKeepalivePings(); - } - } - this.callRefcount += 1; - } - - callUnref() { - this.refTrace( - 'callRefcount ' + - this.callRefcount + - ' -> ' + - (this.callRefcount - 1) - ); - this.callRefcount -= 1; - if (this.callRefcount === 0) { - if (this.session) { - this.session.unref(); - } - this.backoffTimeout.unref(); - if (!this.keepaliveWithoutCalls) { - clearInterval(this.keepaliveIntervalId); - } - this.checkBothRefcounts(); - } - } - ref() { this.refTrace( 'refcount ' + @@ -804,7 +287,18 @@ export class Subchannel { (this.refcount - 1) ); this.refcount -= 1; - this.checkBothRefcounts(); + if (this.refcount === 0) { + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); + } + this.transitionToState( + [ConnectivityState.CONNECTING, ConnectivityState.READY], + ConnectivityState.IDLE + ); + if (this.channelzEnabled) { + unregisterChannelzRef(this.channelzRef); + } + } } unrefIfOneRef(): boolean { @@ -816,83 +310,26 @@ export class Subchannel { } createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener): SubchannelCall { - const headers = metadata.toHttp2Headers(); - headers[HTTP2_HEADER_AUTHORITY] = host; - headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; - headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; - headers[HTTP2_HEADER_METHOD] = 'POST'; - headers[HTTP2_HEADER_PATH] = method; - headers[HTTP2_HEADER_TE] = 'trailers'; - let http2Stream: http2.ClientHttp2Stream; - /* In theory, if an error is thrown by session.request because session has - * become unusable (e.g. because it has received a goaway), this subchannel - * should soon see the corresponding close or goaway event anyway and leave - * READY. But we have seen reports that this does not happen - * (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096) - * so for defense in depth, we just discard the session when we see an - * error here. - */ - try { - http2Stream = this.session!.request(headers); - } catch (e) { - this.transitionToState( - [ConnectivityState.READY], - ConnectivityState.TRANSIENT_FAILURE - ); - throw e; + if (!this.transport) { + throw new Error('Cannot create call, subchannel not READY'); } - this.flowControlTrace( - 'local window size: ' + - this.session!.state.localWindowSize + - ' remote window size: ' + - this.session!.state.remoteWindowSize - ); - const streamSession = this.session; - this.internalsTrace( - 'session.closed=' + - streamSession!.closed + - ' session.destroyed=' + - streamSession!.destroyed + - ' session.socket.destroyed=' + - streamSession!.socket.destroyed); - let statsTracker: SubchannelCallStatsTracker; + let statsTracker: Partial; if (this.channelzEnabled) { this.callTracker.addCallStarted(); this.streamTracker.addCallStarted(); statsTracker = { - addMessageSent: () => { - this.messagesSent += 1; - this.lastMessageSentTimestamp = new Date(); - }, - addMessageReceived: () => { - this.messagesReceived += 1; - }, onCallEnd: status => { if (status.code === Status.OK) { this.callTracker.addCallSucceeded(); } else { this.callTracker.addCallFailed(); } - }, - onStreamEnd: success => { - if (streamSession === this.session) { - if (success) { - this.streamTracker.addCallSucceeded(); - } else { - this.streamTracker.addCallFailed(); - } - } } } } else { - statsTracker = { - addMessageSent: () => {}, - addMessageReceived: () => {}, - onCallEnd: () => {}, - onStreamEnd: () => {} - } + statsTracker = {}; } - return new Http2SubchannelCall(http2Stream, statsTracker, listener, this, getNextCallNumber()); + return this.transport.createCall(metadata, host, method, listener, statsTracker); } /** @@ -946,14 +383,6 @@ export class Subchannel { } } - addDisconnectListener(listener: () => void) { - this.disconnectListeners.add(listener); - } - - removeDisconnectListener(listener: () => void) { - this.disconnectListeners.delete(listener); - } - /** * Reset the backoff timeout, and immediately start connecting if in backoff. */ @@ -976,4 +405,10 @@ export class Subchannel { getRealSubchannel(): this { return this; } + + throttleKeepalive(newKeepaliveTime: number) { + if (newKeepaliveTime > this.keepaliveTime) { + this.keepaliveTime = newKeepaliveTime; + } + } } diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts new file mode 100644 index 00000000..8abc13ab --- /dev/null +++ b/packages/grpc-js/src/transport.ts @@ -0,0 +1,669 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as http2 from 'http2'; +import { checkServerIdentity, CipherNameAndProtocol, ConnectionOptions, PeerCertificate, TLSSocket } from 'tls'; +import { StatusObject } from './call-interface'; +import { ChannelCredentials } from './channel-credentials'; +import { ChannelOptions } from './channel-options'; +import { ChannelzCallTracker, registerChannelzSocket, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef } from './channelz'; +import { LogVerbosity } from './constants'; +import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; +import * as logging from './logging'; +import { getDefaultAuthority } from './resolver'; +import { stringToSubchannelAddress, SubchannelAddress, subchannelAddressToString } from './subchannel-address'; +import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser'; +import * as net from 'net'; +import { Http2SubchannelCall, SubchannelCall, SubchannelCallInterceptingListener } from './subchannel-call'; +import { Metadata } from './metadata'; +import { getNextCallNumber } from './call-number'; + +const TRACER_NAME = 'transport'; +const FLOW_CONTROL_TRACER_NAME = 'transport_flowctrl'; + +const clientVersion = require('../../package.json').version; + +const { + HTTP2_HEADER_AUTHORITY, + HTTP2_HEADER_CONTENT_TYPE, + HTTP2_HEADER_METHOD, + HTTP2_HEADER_PATH, + HTTP2_HEADER_TE, + HTTP2_HEADER_USER_AGENT, +} = http2.constants; + +/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't + * have a constant for the max signed 32 bit integer, so this is a simple way + * to calculate it */ +const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); +const KEEPALIVE_TIMEOUT_MS = 20000; + +export interface CallEventTracker { + addMessageSent(): void; + addMessageReceived(): void; + onCallEnd(status: StatusObject): void; + onStreamEnd(success: boolean): void; +} + +export interface TransportDisconnectListener { + (tooManyPings: boolean): void; +} + +export interface Transport { + getChannelzRef(): SocketRef; + getPeerName(): string; + createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener, subchannelCallStatsTracker: Partial): SubchannelCall; + addDisconnectListener(listener: TransportDisconnectListener): void; + shutdown(): void; +} + +const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii'); + +class Http2Transport implements Transport { + /** + * The amount of time in between sending pings + */ + private keepaliveTimeMs: number = -1; + /** + * The amount of time to wait for an acknowledgement after sending a ping + */ + private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS; + /** + * Timer reference for timeout that indicates when to send the next ping + */ + private keepaliveIntervalId: NodeJS.Timer; + /** + * Timer reference tracking when the most recent ping will be considered lost + */ + private keepaliveTimeoutId: NodeJS.Timer | null = null; + /** + * Indicates whether keepalive pings should be sent without any active calls + */ + private keepaliveWithoutCalls = false; + + private userAgent: string; + + private activeCalls: Set = new Set(); + + private subchannelAddressString: string; + + private disconnectListeners: TransportDisconnectListener[] = []; + + private disconnectHandled = false; + + // Channelz info + private channelzRef: SocketRef; + private readonly channelzEnabled: boolean = true; + /** + * Name of the remote server, if it is not the same as the subchannel + * address, i.e. if connecting through an HTTP CONNECT proxy. + */ + private remoteName: string | null = null; + private streamTracker = new ChannelzCallTracker(); + private keepalivesSent = 0; + private messagesSent = 0; + private messagesReceived = 0; + private lastMessageSentTimestamp: Date | null = null; + private lastMessageReceivedTimestamp: Date | null = null; + + constructor( + private session: http2.ClientHttp2Session, + subchannelAddress: SubchannelAddress, + options: ChannelOptions + ) { + // Build user-agent string. + this.userAgent = [ + options['grpc.primary_user_agent'], + `grpc-node-js/${clientVersion}`, + options['grpc.secondary_user_agent'], + ] + .filter((e) => e) + .join(' '); // remove falsey values first + + if ('grpc.keepalive_time_ms' in options) { + this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!; + } + if ('grpc.keepalive_timeout_ms' in options) { + this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!; + } + if ('grpc.keepalive_permit_without_calls' in options) { + this.keepaliveWithoutCalls = + options['grpc.keepalive_permit_without_calls'] === 1; + } else { + this.keepaliveWithoutCalls = false; + } + this.keepaliveIntervalId = setTimeout(() => {}, 0); + clearTimeout(this.keepaliveIntervalId); + if (this.keepaliveWithoutCalls) { + this.startKeepalivePings(); + } + + this.subchannelAddressString = subchannelAddressToString(subchannelAddress); + + if (options['grpc.enable_channelz'] === 0) { + this.channelzEnabled = false; + } + this.channelzRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled); + + session.once('close', () => { + this.trace('session closed'); + this.stopKeepalivePings(); + this.handleDisconnect(); + }); + session.once('goaway', (errorCode: number, lastStreamID: number, opaqueData: Buffer) => { + let tooManyPings = false; + /* See the last paragraph of + * https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */ + if ( + errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM && + opaqueData.equals(tooManyPingsData) + ) { + tooManyPings = true; + } + this.trace( + 'connection closed by GOAWAY with code ' + + errorCode + ); + this.reportDisconnectToOwner(tooManyPings); + }); + session.once('error', error => { + /* Do nothing here. Any error should also trigger a close event, which is + * where we want to handle that. */ + this.trace( + 'connection closed with error ' + + (error as Error).message + ); + }); + if (logging.isTracerEnabled(TRACER_NAME)) { + session.on('remoteSettings', (settings: http2.Settings) => { + this.trace( + 'new settings received' + + (this.session !== session ? ' on the old connection' : '') + + ': ' + + JSON.stringify(settings) + ); + }); + session.on('localSettings', (settings: http2.Settings) => { + this.trace( + 'local settings acknowledged by remote' + + (this.session !== session ? ' on the old connection' : '') + + ': ' + + JSON.stringify(settings) + ); + }); + } + } + + private getChannelzInfo(): SocketInfo { + const sessionSocket = this.session.socket; + const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null; + const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null; + let tlsInfo: TlsInfo | null; + if (this.session.encrypted) { + const tlsSocket: TLSSocket = sessionSocket as TLSSocket; + const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher(); + const certificate = tlsSocket.getCertificate(); + const peerCertificate = tlsSocket.getPeerCertificate(); + tlsInfo = { + cipherSuiteStandardName: cipherInfo.standardName ?? null, + cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name, + localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null, + remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null + }; + } else { + tlsInfo = null; + } + const socketInfo: SocketInfo = { + remoteAddress: remoteAddress, + localAddress: localAddress, + security: tlsInfo, + remoteName: this.remoteName, + streamsStarted: this.streamTracker.callsStarted, + streamsSucceeded: this.streamTracker.callsSucceeded, + streamsFailed: this.streamTracker.callsFailed, + messagesSent: this.messagesSent, + messagesReceived: this.messagesReceived, + keepAlivesSent: this.keepalivesSent, + lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp, + lastRemoteStreamCreatedTimestamp: null, + lastMessageSentTimestamp: this.lastMessageSentTimestamp, + lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp, + localFlowControlWindow: this.session.state.localWindowSize ?? null, + remoteFlowControlWindow: this.session.state.remoteWindowSize ?? null + }; + return socketInfo; + } + + private trace(text: string): void { + logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); + } + + private keepaliveTrace(text: string): void { + logging.trace(LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); + } + + private flowControlTrace(text: string): void { + logging.trace(LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); + } + + private internalsTrace(text: string): void { + logging.trace(LogVerbosity.DEBUG, 'transport_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); + } + + /** + * Indicate to the owner of this object that this transport should no longer + * be used. That happens if the connection drops, or if the server sends a + * GOAWAY. + * @param tooManyPings If true, this was triggered by a GOAWAY with data + * indicating that the session was closed becaues the client sent too many + * pings. + * @returns + */ + private reportDisconnectToOwner(tooManyPings: boolean) { + if (this.disconnectHandled) { + return; + } + this.disconnectHandled = true; + this.disconnectListeners.forEach(listener => listener(tooManyPings)); + } + + /** + * Handle connection drops, but not GOAWAYs. + */ + private handleDisconnect() { + this.reportDisconnectToOwner(false); + /* Give calls an event loop cycle to finish naturally before reporting the + * disconnnection to them. */ + setImmediate(() => { + for (const call of this.activeCalls) { + call.onDisconnect(); + } + }); + } + + addDisconnectListener(listener: TransportDisconnectListener): void { + this.disconnectListeners.push(listener); + } + + private clearKeepaliveTimeout() { + if (!this.keepaliveTimeoutId) { + return; + } + clearTimeout(this.keepaliveTimeoutId); + this.keepaliveTimeoutId = null; + } + + private sendPing() { + if (this.channelzEnabled) { + this.keepalivesSent += 1; + } + this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms'); + if (!this.keepaliveTimeoutId) { + this.keepaliveTimeoutId = setTimeout(() => { + this.keepaliveTrace('Ping timeout passed without response'); + this.handleDisconnect(); + }, this.keepaliveTimeoutMs); + this.keepaliveTimeoutId.unref?.(); + } + try { + this.session!.ping( + (err: Error | null, duration: number, payload: Buffer) => { + this.keepaliveTrace('Received ping response'); + this.clearKeepaliveTimeout(); + } + ); + } catch (e) { + /* If we fail to send a ping, the connection is no longer functional, so + * we should discard it. */ + this.handleDisconnect(); + } + } + + private startKeepalivePings() { + if (this.keepaliveTimeMs < 0) { + return; + } + this.keepaliveIntervalId = setInterval(() => { + this.sendPing(); + }, this.keepaliveTimeMs); + this.keepaliveIntervalId.unref?.(); + /* Don't send a ping immediately because whatever caused us to start + * sending pings should also involve some network activity. */ + } + + /** + * Stop keepalive pings when terminating a connection. This discards the + * outstanding ping timeout, so it should not be called if the same + * connection will still be used. + */ + private stopKeepalivePings() { + clearInterval(this.keepaliveIntervalId); + this.clearKeepaliveTimeout(); + } + + private removeActiveCall(call: Http2SubchannelCall) { + this.activeCalls.delete(call); + if (this.activeCalls.size === 0) { + this.session.unref(); + if (!this.keepaliveWithoutCalls) { + this.stopKeepalivePings(); + } + } + } + + private addActiveCall(call: Http2SubchannelCall) { + if (this.activeCalls.size === 0) { + this.session.ref(); + if (!this.keepaliveWithoutCalls) { + this.startKeepalivePings(); + } + } + this.activeCalls.add(call); + } + + createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener, subchannelCallStatsTracker: Partial): Http2SubchannelCall { + const headers = metadata.toHttp2Headers(); + headers[HTTP2_HEADER_AUTHORITY] = host; + headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; + headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; + headers[HTTP2_HEADER_METHOD] = 'POST'; + headers[HTTP2_HEADER_PATH] = method; + headers[HTTP2_HEADER_TE] = 'trailers'; + let http2Stream: http2.ClientHttp2Stream; + /* In theory, if an error is thrown by session.request because session has + * become unusable (e.g. because it has received a goaway), this subchannel + * should soon see the corresponding close or goaway event anyway and leave + * READY. But we have seen reports that this does not happen + * (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096) + * so for defense in depth, we just discard the session when we see an + * error here. + */ + try { + http2Stream = this.session!.request(headers); + } catch (e) { + this.handleDisconnect(); + throw e; + } + this.flowControlTrace( + 'local window size: ' + + this.session.state.localWindowSize + + ' remote window size: ' + + this.session.state.remoteWindowSize + ); + this.internalsTrace( + 'session.closed=' + + this.session.closed + + ' session.destroyed=' + + this.session.destroyed + + ' session.socket.destroyed=' + + this.session.socket.destroyed); + let eventTracker: CallEventTracker; + let call: Http2SubchannelCall; + if (this.channelzEnabled) { + this.streamTracker.addCallStarted(); + eventTracker = { + addMessageSent: () => { + this.messagesSent += 1; + this.lastMessageSentTimestamp = new Date(); + subchannelCallStatsTracker.addMessageSent?.(); + }, + addMessageReceived: () => { + this.messagesReceived += 1; + this.lastMessageReceivedTimestamp = new Date(); + subchannelCallStatsTracker.addMessageReceived?.(); + }, + onCallEnd: status => { + subchannelCallStatsTracker.onCallEnd?.(status); + this.removeActiveCall(call); + }, + onStreamEnd: success => { + if (success) { + this.streamTracker.addCallSucceeded(); + } else { + this.streamTracker.addCallFailed(); + } + subchannelCallStatsTracker.onStreamEnd?.(success); + } + } + } else { + eventTracker = { + addMessageSent: () => { + subchannelCallStatsTracker.addMessageSent?.(); + }, + addMessageReceived: () => { + subchannelCallStatsTracker.addMessageReceived?.(); + }, + onCallEnd: (status) => { + subchannelCallStatsTracker.onCallEnd?.(status); + this.removeActiveCall(call); + }, + onStreamEnd: (success) => { + subchannelCallStatsTracker.onStreamEnd?.(success); + } + } + } + call = new Http2SubchannelCall(http2Stream, eventTracker, listener, this, getNextCallNumber()); + this.addActiveCall(call); + return call; + } + + getChannelzRef(): SocketRef { + return this.channelzRef; + } + + getPeerName() { + return this.subchannelAddressString; + } + + shutdown() { + this.session.close(); + unregisterChannelzRef(this.channelzRef); + } +} + +export interface SubchannelConnector { + connect(address: SubchannelAddress, credentials: ChannelCredentials, options: ChannelOptions): Promise; + shutdown(): void; +} + +export class Http2SubchannelConnector implements SubchannelConnector { + private session: http2.ClientHttp2Session | null = null; + private isShutdown = false; + constructor(private channelTarget: GrpcUri) {} + private trace(text: string) { + + } + private createSession(address: SubchannelAddress, credentials: ChannelCredentials, options: ChannelOptions, proxyConnectionResult: ProxyConnectionResult): Promise { + if (this.isShutdown) { + return Promise.reject(); + } + return new Promise((resolve, reject) => { + let remoteName: string | null; + if (proxyConnectionResult.realTarget) { + remoteName = uriToString(proxyConnectionResult.realTarget); + this.trace('creating HTTP/2 session through proxy to ' + uriToString(proxyConnectionResult.realTarget)); + } else { + remoteName = null; + this.trace('creating HTTP/2 session to ' + subchannelAddressToString(address)); + } + const targetAuthority = getDefaultAuthority( + proxyConnectionResult.realTarget ?? this.channelTarget + ); + let connectionOptions: http2.SecureClientSessionOptions = + credentials._getConnectionOptions() || {}; + connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER; + if ('grpc-node.max_session_memory' in options) { + connectionOptions.maxSessionMemory = options[ + 'grpc-node.max_session_memory' + ]; + } else { + /* By default, set a very large max session memory limit, to effectively + * disable enforcement of the limit. Some testing indicates that Node's + * behavior degrades badly when this limit is reached, so we solve that + * by disabling the check entirely. */ + connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER; + } + let addressScheme = 'http://'; + if ('secureContext' in connectionOptions) { + addressScheme = 'https://'; + // If provided, the value of grpc.ssl_target_name_override should be used + // to override the target hostname when checking server identity. + // This option is used for testing only. + if (options['grpc.ssl_target_name_override']) { + const sslTargetNameOverride = options[ + 'grpc.ssl_target_name_override' + ]!; + connectionOptions.checkServerIdentity = ( + host: string, + cert: PeerCertificate + ): Error | undefined => { + return checkServerIdentity(sslTargetNameOverride, cert); + }; + connectionOptions.servername = sslTargetNameOverride; + } else { + const authorityHostname = + splitHostPort(targetAuthority)?.host ?? 'localhost'; + // We want to always set servername to support SNI + connectionOptions.servername = authorityHostname; + } + if (proxyConnectionResult.socket) { + /* This is part of the workaround for + * https://github.com/nodejs/node/issues/32922. Without that bug, + * proxyConnectionResult.socket would always be a plaintext socket and + * this would say + * connectionOptions.socket = proxyConnectionResult.socket; */ + connectionOptions.createConnection = (authority, option) => { + return proxyConnectionResult.socket!; + }; + } + } else { + /* In all but the most recent versions of Node, http2.connect does not use + * the options when establishing plaintext connections, so we need to + * establish that connection explicitly. */ + connectionOptions.createConnection = (authority, option) => { + if (proxyConnectionResult.socket) { + return proxyConnectionResult.socket; + } else { + /* net.NetConnectOpts is declared in a way that is more restrictive + * than what net.connect will actually accept, so we use the type + * assertion to work around that. */ + return net.connect(address); + } + }; + } + + connectionOptions = { + ...connectionOptions, + ...address, + }; + + /* http2.connect uses the options here: + * https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036 + * The spread operator overides earlier values with later ones, so any port + * or host values in the options will be used rather than any values extracted + * from the first argument. In addition, the path overrides the host and port, + * as documented for plaintext connections here: + * https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener + * and for TLS connections here: + * https://nodejs.org/api/tls.html#tls_tls_connect_options_callback. In + * earlier versions of Node, http2.connect passes these options to + * tls.connect but not net.connect, so in the insecure case we still need + * to set the createConnection option above to create the connection + * explicitly. We cannot do that in the TLS case because http2.connect + * passes necessary additional options to tls.connect. + * The first argument just needs to be parseable as a URL and the scheme + * determines whether the connection will be established over TLS or not. + */ + const session = http2.connect( + addressScheme + targetAuthority, + connectionOptions + ); + this.session = session; + session.unref(); + session.once('connect', () => { + session.removeAllListeners(); + resolve(new Http2Transport(session, address, options)); + this.session = null; + }); + session.once('close', () => { + this.session = null; + reject(); + }); + session.once('error', error => { + this.trace('connection failed with error ' + (error as Error).message) + }); + }); + } + connect(address: SubchannelAddress, credentials: ChannelCredentials, options: ChannelOptions): Promise { + if (this.isShutdown) { + return Promise.reject(); + } + /* Pass connection options through to the proxy so that it's able to + * upgrade it's connection to support tls if needed. + * This is a workaround for https://github.com/nodejs/node/issues/32922 + * See https://github.com/grpc/grpc-node/pull/1369 for more info. */ + const connectionOptions: ConnectionOptions = + credentials._getConnectionOptions() || {}; + + if ('secureContext' in connectionOptions) { + connectionOptions.ALPNProtocols = ['h2']; + // If provided, the value of grpc.ssl_target_name_override should be used + // to override the target hostname when checking server identity. + // This option is used for testing only. + if (options['grpc.ssl_target_name_override']) { + const sslTargetNameOverride = options[ + 'grpc.ssl_target_name_override' + ]!; + connectionOptions.checkServerIdentity = ( + host: string, + cert: PeerCertificate + ): Error | undefined => { + return checkServerIdentity(sslTargetNameOverride, cert); + }; + connectionOptions.servername = sslTargetNameOverride; + } else { + if ('grpc.http_connect_target' in options) { + /* This is more or less how servername will be set in createSession + * if a connection is successfully established through the proxy. + * If the proxy is not used, these connectionOptions are discarded + * anyway */ + const targetPath = getDefaultAuthority( + parseUri(options['grpc.http_connect_target'] as string) ?? { + path: 'localhost', + } + ); + const hostPort = splitHostPort(targetPath); + connectionOptions.servername = hostPort?.host ?? targetPath; + } + } + } + + return getProxiedConnection( + address, + options, + connectionOptions + ).then( + result => this.createSession(address, credentials, options, result) + ); + } + + shutdown(): void { + this.isShutdown = true; + this.session?.close(); + this.session = null; + } +} \ No newline at end of file diff --git a/packages/grpc-js/test/test-resolver.ts b/packages/grpc-js/test/test-resolver.ts index a3e6793f..1d458125 100644 --- a/packages/grpc-js/test/test-resolver.ts +++ b/packages/grpc-js/test/test-resolver.ts @@ -207,6 +207,64 @@ describe('Name Resolver', () => { const resolver = resolverManager.createResolver(target, listener, {}); resolver.updateResolution(); }); + // Created DNS TXT record using TXT sample from https://github.com/grpc/proposal/blob/master/A2-service-configs-in-dns.md + // "grpc_config=[{\"serviceConfig\":{\"loadBalancingPolicy\":\"round_robin\",\"methodConfig\":[{\"name\":[{\"service\":\"MyService\",\"method\":\"Foo\"}],\"waitForReady\":true}]}}]" + it.skip('Should resolve a name with TXT service config', done => { + const target = resolverManager.mapUriDefaultScheme(parseUri('grpctest.kleinsch.com')!)!; + const listener: resolverManager.ResolverListener = { + onSuccessfulResolution: ( + addressList: SubchannelAddress[], + serviceConfig: ServiceConfig | null, + serviceConfigError: StatusObject | null + ) => { + if (serviceConfig !== null) { + assert( + serviceConfig.loadBalancingPolicy === 'round_robin', + 'Should have found round robin LB policy' + ); + done(); + } + }, + onError: (error: StatusObject) => { + done(new Error(`Failed with status ${error.details}`)); + }, + }; + const resolver = resolverManager.createResolver(target, listener, {}); + resolver.updateResolution(); + }); + it.skip( + 'Should not resolve TXT service config if we disabled service config', + (done) => { + const target = resolverManager.mapUriDefaultScheme( + parseUri('grpctest.kleinsch.com')! + )!; + let count = 0; + const listener: resolverManager.ResolverListener = { + onSuccessfulResolution: ( + addressList: SubchannelAddress[], + serviceConfig: ServiceConfig | null, + serviceConfigError: StatusObject | null + ) => { + assert( + serviceConfig === null, + 'Should not have found service config' + ); + count++; + }, + onError: (error: StatusObject) => { + done(new Error(`Failed with status ${error.details}`)); + }, + }; + const resolver = resolverManager.createResolver(target, listener, { + 'grpc.service_config_disable_resolution': 1, + }); + resolver.updateResolution(); + setTimeout(() => { + assert(count === 1, 'Should have only resolved once'); + done(); + }, 2_000); + } + ); /* The DNS entry for loopback4.unittest.grpc.io only has a single A record * with the address 127.0.0.1, but the Mac DNS resolver appears to use * NAT64 to create an IPv6 address in that case, so it instead returns diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index 0c0ba168..c67ebc4d 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -27,9 +27,9 @@ import * as grpc from '../src'; import { Server, ServerCredentials } from '../src'; import { ServiceError } from '../src/call'; import { ServiceClient, ServiceClientConstructor } from '../src/make-client'; -import { sendUnaryData, ServerUnaryCall } from '../src/server-call'; +import { sendUnaryData, ServerUnaryCall, ServerDuplexStream } from '../src/server-call'; -import { loadProtoFile } from './common'; +import { assert2, loadProtoFile } from './common'; import { TestServiceClient, TestServiceHandlers } from './generated/TestService'; import { ProtoGrpcType as TestServiceGrpcType } from './generated/test_service'; import { Request__Output } from './generated/Request'; @@ -458,18 +458,28 @@ describe('Server', () => { describe('Echo service', () => { let server: Server; let client: ServiceClient; + const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); + const echoService = loadProtoFile(protoFile) + .EchoService as ServiceClientConstructor; + + const serviceImplementation = { + echo(call: ServerUnaryCall, callback: sendUnaryData) { + callback(null, call.request); + }, + echoBidiStream(call: ServerDuplexStream) { + call.on('data', data => { + call.write(data); + }); + call.on('end', () => { + call.end(); + }); + } + }; before(done => { - const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); - const echoService = loadProtoFile(protoFile) - .EchoService as ServiceClientConstructor; server = new Server(); - server.addService(echoService.service, { - echo(call: ServerUnaryCall, callback: sendUnaryData) { - callback(null, call.request); - }, - }); + server.addService(echoService.service, serviceImplementation); server.bindAsync( 'localhost:0', @@ -501,6 +511,43 @@ describe('Echo service', () => { } ); }); + + /* This test passes on Node 18 but fails on Node 16. The failure appears to + * be caused by https://github.com/nodejs/node/issues/42713 */ + it.skip('should continue a stream after server shutdown', done => { + const server2 = new Server(); + server2.addService(echoService.service, serviceImplementation); + server2.bindAsync('localhost:0', ServerCredentials.createInsecure(), (err, port) => { + if (err) { + done(err); + return; + } + const client2 = new echoService(`localhost:${port}`, grpc.credentials.createInsecure()); + server2.start(); + const stream = client2.echoBidiStream(); + const totalMessages = 5; + let messagesSent = 0; + stream.write({ value: 'test value', value2: messagesSent}); + messagesSent += 1; + stream.on('data', () => { + if (messagesSent === 1) { + server2.tryShutdown(assert2.mustCall(() => {})); + } + if (messagesSent >= totalMessages) { + stream.end(); + } else { + stream.write({ value: 'test value', value2: messagesSent}); + messagesSent += 1; + } + }); + stream.on('status', assert2.mustCall((status: grpc.StatusObject) => { + assert.strictEqual(status.code, grpc.status.OK); + assert.strictEqual(messagesSent, totalMessages); + })); + stream.on('error', () => {}); + assert2.afterMustCallsSatisfied(done); + }); + }); }); describe('Generic client and server', () => {