From 4361242eb90959fcf23830ea2c7f985018ba7bf0 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 20 May 2020 14:00:05 -0700 Subject: [PATCH 01/24] Add priority load balancer --- .../grpc-js/src/load-balancer-priority.ts | 457 ++++++++++++++++++ packages/grpc-js/src/load-balancer.ts | 2 + 2 files changed, 459 insertions(+) create mode 100644 packages/grpc-js/src/load-balancer-priority.ts diff --git a/packages/grpc-js/src/load-balancer-priority.ts b/packages/grpc-js/src/load-balancer-priority.ts new file mode 100644 index 00000000..94f7f560 --- /dev/null +++ b/packages/grpc-js/src/load-balancer-priority.ts @@ -0,0 +1,457 @@ +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { + LoadBalancer, + ChannelControlHelper, + getFirstUsableConfig, + registerLoadBalancerType, +} from './load-balancer'; +import { SubchannelAddress } from './subchannel'; +import { + LoadBalancingConfig, + isPriorityLoadBalancingConfig, +} from './load-balancing-config'; +import { ConnectivityState } from './channel'; +import { Picker, QueuePicker, UnavailablePicker } from './picker'; +import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; +import { ChannelOptions } from './channel-options'; +import { Status } from './constants'; +import { Metadata } from './metadata'; + +const TYPE_NAME = 'priority'; + +const DEFAULT_FAILOVER_TIME_MS = 10_000; +const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000; + +export type LocalitySubchannelAddress = SubchannelAddress & { + localityPath: string[]; +}; + +export function isLocalitySubchannelAddress( + address: SubchannelAddress +): address is LocalitySubchannelAddress { + return Array.isArray((address as LocalitySubchannelAddress).localityPath); +} + +interface PriorityChildBalancer { + updateAddressList( + addressList: SubchannelAddress[], + lbConfig: LoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void; + exitIdle(): void; + resetBackoff(): void; + deactivate(): void; + maybeReactivate(): void; + cancelFailoverTimer(): void; + isFailoverTimerPending(): boolean; + getConnectivityState(): ConnectivityState; + getPicker(): Picker; + getName(): string; + destroy(): void; +} + +interface UpdateArgs { + subchannelAddress: SubchannelAddress[]; + lbConfig: LoadBalancingConfig; +} + +export class PriorityLoadBalancer implements LoadBalancer { + /** + * Inner class for holding a child priority and managing associated timers. + */ + private PriorityChildImpl = class implements PriorityChildBalancer { + private connectivityState: ConnectivityState = ConnectivityState.IDLE; + private picker: Picker; + private childBalancer: ChildLoadBalancerHandler; + private failoverTimer: NodeJS.Timer | null = null; + private deactivationTimer: NodeJS.Timer | null = null; + constructor(private parent: PriorityLoadBalancer, private name: string) { + this.childBalancer = new ChildLoadBalancerHandler({ + createSubchannel: ( + subchannelAddress: SubchannelAddress, + subchannelArgs: ChannelOptions + ) => { + return this.parent.channelControlHelper.createSubchannel( + subchannelAddress, + subchannelArgs + ); + }, + updateState: (connectivityState: ConnectivityState, picker: Picker) => { + this.updateState(connectivityState, picker); + }, + requestReresolution: () => { + this.parent.channelControlHelper.requestReresolution(); + }, + }); + this.picker = new QueuePicker(this.childBalancer); + + this.deactivationTimer = setTimeout(() => {}, 0); + clearTimeout(this.deactivationTimer); + } + + private updateState(connectivityState: ConnectivityState, picker: Picker) { + this.connectivityState = connectivityState; + this.picker = picker; + this.parent.onChildStateChange(this); + } + + updateAddressList( + addressList: SubchannelAddress[], + lbConfig: LoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void { + this.childBalancer.updateAddressList(addressList, lbConfig, attributes); + this.failoverTimer = setTimeout(() => { + this.failoverTimer = null; + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker() + ); + }, DEFAULT_FAILOVER_TIME_MS); + } + + exitIdle() { + this.childBalancer.exitIdle(); + } + + resetBackoff() { + this.childBalancer.resetBackoff(); + } + + deactivate() { + if (this.deactivationTimer === null) { + this.deactivationTimer = setTimeout(() => { + this.parent.deleteChild(this); + this.childBalancer.destroy(); + }, DEFAULT_RETENTION_INTERVAL_MS); + } + } + + maybeReactivate() { + if (this.deactivationTimer !== null) { + clearTimeout(this.deactivationTimer); + this.deactivationTimer = null; + } + } + + cancelFailoverTimer() { + if (this.failoverTimer !== null) { + clearTimeout(this.failoverTimer); + this.failoverTimer = null; + } + } + + isFailoverTimerPending() { + return this.failoverTimer !== null; + } + + getConnectivityState() { + return this.connectivityState; + } + + getPicker() { + return this.picker; + } + + getName() { + return this.name; + } + + destroy() { + this.childBalancer.destroy(); + } + }; + // End of inner class PriorityChildImpl + + private children: Map = new Map< + string, + PriorityChildBalancer + >(); + /** + * The priority order of child names from the latest config update. + */ + private priorities: string[] = []; + /** + * The attributes object from the latest update, saved to be passed along to + * each new child as they are created + */ + private latestAttributes: { [key: string]: unknown } = {}; + /** + * The latest load balancing policies and address lists for each child from + * the latest update + */ + private latestUpdates: Map = new Map< + string, + UpdateArgs + >(); + /** + * Current chosen priority that requests are sent to + */ + private currentPriority: number | null = null; + /** + * After an update, this preserves the currently selected child from before + * the update. We continue to use that child until it disconnects, or + * another higher-priority child connects, or it is deleted because it is not + * in the new priority list at all and its retention interval has expired, or + * we try and fail to connect to every child in the new priority list. + */ + private currentChildFromBeforeUpdate: PriorityChildBalancer | null = null; + + constructor(private channelControlHelper: ChannelControlHelper) {} + + private onChildStateChange(child: PriorityChildBalancer) { + const childState = child.getConnectivityState(); + if (child === this.currentChildFromBeforeUpdate) { + if ( + childState === ConnectivityState.READY || + childState === ConnectivityState.IDLE + ) { + this.channelControlHelper.updateState(childState, child.getPicker()); + } else { + this.currentChildFromBeforeUpdate = null; + this.tryNextPriority(true); + } + return; + } + const childPriority = this.priorities.indexOf(child.getName()); + if (childPriority < 0) { + // child is not in the priority list, ignore updates + return; + } + if (this.currentPriority !== null && childPriority > this.currentPriority) { + // child is lower priority than the currently selected child, ignore updates + return; + } + if (childState === ConnectivityState.TRANSIENT_FAILURE) { + /* Report connecting if and only if the currently selected child is the + * one entering TRANSIENT_FAILURE */ + this.tryNextPriority(childPriority === this.currentPriority); + return; + } + if (this.currentPriority === null || childPriority < this.currentPriority) { + /* In this case, either there is no currently selected child or this + * child is higher priority than the currently selected child, so we want + * to switch to it if it is READY or IDLE. */ + if ( + childState === ConnectivityState.READY || + childState === ConnectivityState.IDLE + ) { + this.selectPriority(childPriority); + } + return; + } + /* The currently selected child has updated state to something other than + * TRANSIENT_FAILURE, so we pass that update along */ + this.channelControlHelper.updateState(childState, child.getPicker()); + } + + private deleteChild(child: PriorityChildBalancer) { + if (child === this.currentChildFromBeforeUpdate) { + this.currentChildFromBeforeUpdate = null; + /* If we get to this point, the currentChildFromBeforeUpdate was still in + * use, so we are still trying to connect to the specified priorities */ + this.tryNextPriority(true); + } + } + + /** + * Select the child at the specified priority, and report that child's state + * as this balancer's state until that child disconnects or a higher-priority + * child connects. + * @param priority + */ + private selectPriority(priority: number) { + this.currentPriority = priority; + const chosenChild = this.children.get(this.priorities[priority])!; + this.channelControlHelper.updateState( + chosenChild.getConnectivityState(), + chosenChild.getPicker() + ); + this.currentChildFromBeforeUpdate = null; + // Deactivate each child of lower priority than the chosen child + for (let i = priority + 1; i < this.priorities.length; i++) { + this.children.get(this.priorities[i])?.deactivate(); + } + } + + /** + * Check each child in priority order until we find one to use + * @param reportConnecting Whether we should report a CONNECTING state if we + * stop before picking a specific child. This should be true when we have + * not already selected a child. + */ + private tryNextPriority(reportConnecting: boolean) { + for (const [index, childName] of this.priorities.entries()) { + let child = this.children.get(childName); + /* If the child doesn't already exist, create it and update it. */ + if (child === undefined) { + if (reportConnecting) { + this.channelControlHelper.updateState( + ConnectivityState.CONNECTING, + new QueuePicker(this) + ); + } + child = new this.PriorityChildImpl(this, childName); + this.children.set(childName, child); + const childUpdate = this.latestUpdates.get(childName); + if (childUpdate !== undefined) { + child.updateAddressList( + childUpdate.subchannelAddress, + childUpdate.lbConfig, + this.latestAttributes + ); + } + } + /* We're going to try to use this child, so reactivate it if it has been + * deactivated */ + child.maybeReactivate(); + if ( + child.getConnectivityState() === ConnectivityState.READY || + child.getConnectivityState() === ConnectivityState.IDLE + ) { + this.selectPriority(index); + return; + } + if (child.isFailoverTimerPending()) { + /* This child is still trying to connect. Wait until its failover timer + * has ended to continue to the next one */ + if (reportConnecting) { + this.channelControlHelper.updateState( + ConnectivityState.CONNECTING, + new QueuePicker(this) + ); + } + return; + } + } + this.currentPriority = null; + this.currentChildFromBeforeUpdate = null; + this.channelControlHelper.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker({ + code: Status.UNAVAILABLE, + details: 'No ready priority', + metadata: new Metadata(), + }) + ); + } + + updateAddressList( + addressList: SubchannelAddress[], + lbConfig: LoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void { + if (!isPriorityLoadBalancingConfig(lbConfig)) { + // Reject a config of the wrong type + return; + } + const priorityConfig = lbConfig.priority; + /* For each address, the first element of its localityPath array determines + * which child it belongs to. So we bucket those addresses by that first + * element, and pass along the rest of the localityPath for that child + * to use. */ + const childAddressMap: Map = new Map< + string, + LocalitySubchannelAddress[] + >(); + for (const address of addressList) { + if (!isLocalitySubchannelAddress(address)) { + // Reject address that cannot be prioritized + return; + } + if (address.localityPath.length < 1) { + // Reject address that cannot be prioritized + return; + } + const childName = address.localityPath[0]; + const childAddress: LocalitySubchannelAddress = { + ...address, + localityPath: address.localityPath.slice(1), + }; + let childAddressList = childAddressMap.get(childName); + if (childAddressList === undefined) { + childAddressList = []; + childAddressMap.set(childName, childAddressList); + } + childAddressList.push(childAddress); + } + if (this.currentPriority !== null) { + this.currentChildFromBeforeUpdate = this.children.get( + this.priorities[this.currentPriority] + )!; + this.currentPriority = null; + } + this.latestAttributes = attributes; + this.latestUpdates.clear(); + this.priorities = priorityConfig.priorities; + /* Pair up the new child configs with the corresponding address lists, and + * update all existing children with their new configs */ + for (const [childName, childConfig] of priorityConfig.children.entries()) { + const chosenChildConfig = getFirstUsableConfig(childConfig.config); + if (chosenChildConfig !== null) { + const childAddresses = childAddressMap.get(childName) ?? []; + this.latestUpdates.set(childName, { + subchannelAddress: childAddresses, + lbConfig: chosenChildConfig, + }); + const existingChild = this.children.get(childName); + if (existingChild !== undefined) { + existingChild.updateAddressList( + childAddresses, + chosenChildConfig, + attributes + ); + } + } + } + // Deactivate all children that are no longer in the priority list + for (const [childName, child] of this.children.entries()) { + if (this.priorities.indexOf(childName) < 0) { + child.deactivate(); + } + } + // Only report connecting if there are no existing children + this.tryNextPriority(this.children.size === 0); + } + exitIdle(): void { + if (this.currentPriority !== null) { + this.children.get(this.priorities[this.currentPriority])?.exitIdle(); + } + } + resetBackoff(): void { + for (const child of this.children.values()) { + child.resetBackoff(); + } + } + destroy(): void { + for (const child of this.children.values()) { + child.destroy(); + } + this.children.clear(); + this.currentChildFromBeforeUpdate?.destroy(); + this.currentChildFromBeforeUpdate = null; + } + getTypeName(): string { + return TYPE_NAME; + } +} + +export function setup() { + registerLoadBalancerType(TYPE_NAME, PriorityLoadBalancer); +} diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index dc4cf4e3..2e91ffc5 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -22,6 +22,7 @@ import { Picker } from './picker'; import { LoadBalancingConfig } from './load-balancing-config'; import * as load_balancer_pick_first from './load-balancer-pick-first'; import * as load_balancer_round_robin from './load-balancer-round-robin'; +import * as load_balancer_priority from './load-balancer-priority'; /** * A collection of functions associated with a channel that a load balancer @@ -137,4 +138,5 @@ export function getFirstUsableConfig( export function registerAll() { load_balancer_pick_first.setup(); load_balancer_round_robin.setup(); + load_balancer_priority.setup(); } From be31009d9a084e7c586a763ae2582dc856f80766 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 21 May 2020 10:04:29 -0700 Subject: [PATCH 02/24] Start failover timers when leaving IDLE --- .../grpc-js/src/load-balancer-priority.ts | 51 ++++++++++++------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/packages/grpc-js/src/load-balancer-priority.ts b/packages/grpc-js/src/load-balancer-priority.ts index 94f7f560..94a4ace7 100644 --- a/packages/grpc-js/src/load-balancer-priority.ts +++ b/packages/grpc-js/src/load-balancer-priority.ts @@ -111,22 +111,31 @@ export class PriorityLoadBalancer implements LoadBalancer { this.parent.onChildStateChange(this); } + private startFailoverTimer() { + if (this.failoverTimer === null) { + this.failoverTimer = setTimeout(() => { + this.failoverTimer = null; + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker() + ); + }, DEFAULT_FAILOVER_TIME_MS); + } + } + updateAddressList( addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown } ): void { this.childBalancer.updateAddressList(addressList, lbConfig, attributes); - this.failoverTimer = setTimeout(() => { - this.failoverTimer = null; - this.updateState( - ConnectivityState.TRANSIENT_FAILURE, - new UnavailablePicker() - ); - }, DEFAULT_FAILOVER_TIME_MS); + this.startFailoverTimer(); } exitIdle() { + if (this.connectivityState === ConnectivityState.IDLE) { + this.startFailoverTimer(); + } this.childBalancer.exitIdle(); } @@ -215,6 +224,16 @@ export class PriorityLoadBalancer implements LoadBalancer { constructor(private channelControlHelper: ChannelControlHelper) {} + private updateState(state: ConnectivityState, picker: Picker) { + /* If switching to IDLE, use a QueuePicker attached to this load balancer + * so that when the picker calls exitIdle, that in turn calls exitIdle on + * the PriorityChildImpl, which will start the failover timer. */ + if (state === ConnectivityState.IDLE) { + picker = new QueuePicker(this); + } + this.channelControlHelper.updateState(state, picker); + } + private onChildStateChange(child: PriorityChildBalancer) { const childState = child.getConnectivityState(); if (child === this.currentChildFromBeforeUpdate) { @@ -222,7 +241,7 @@ export class PriorityLoadBalancer implements LoadBalancer { childState === ConnectivityState.READY || childState === ConnectivityState.IDLE ) { - this.channelControlHelper.updateState(childState, child.getPicker()); + this.updateState(childState, child.getPicker()); } else { this.currentChildFromBeforeUpdate = null; this.tryNextPriority(true); @@ -258,7 +277,7 @@ export class PriorityLoadBalancer implements LoadBalancer { } /* The currently selected child has updated state to something other than * TRANSIENT_FAILURE, so we pass that update along */ - this.channelControlHelper.updateState(childState, child.getPicker()); + this.updateState(childState, child.getPicker()); } private deleteChild(child: PriorityChildBalancer) { @@ -279,7 +298,7 @@ export class PriorityLoadBalancer implements LoadBalancer { private selectPriority(priority: number) { this.currentPriority = priority; const chosenChild = this.children.get(this.priorities[priority])!; - this.channelControlHelper.updateState( + this.updateState( chosenChild.getConnectivityState(), chosenChild.getPicker() ); @@ -302,10 +321,7 @@ export class PriorityLoadBalancer implements LoadBalancer { /* If the child doesn't already exist, create it and update it. */ if (child === undefined) { if (reportConnecting) { - this.channelControlHelper.updateState( - ConnectivityState.CONNECTING, - new QueuePicker(this) - ); + this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); } child = new this.PriorityChildImpl(this, childName); this.children.set(childName, child); @@ -332,17 +348,14 @@ export class PriorityLoadBalancer implements LoadBalancer { /* This child is still trying to connect. Wait until its failover timer * has ended to continue to the next one */ if (reportConnecting) { - this.channelControlHelper.updateState( - ConnectivityState.CONNECTING, - new QueuePicker(this) - ); + this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); } return; } } this.currentPriority = null; this.currentChildFromBeforeUpdate = null; - this.channelControlHelper.updateState( + this.updateState( ConnectivityState.TRANSIENT_FAILURE, new UnavailablePicker({ code: Status.UNAVAILABLE, From 807d7d510ff67dcb6f7a9846f58a10b19cfff587 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 28 May 2020 11:48:47 -0700 Subject: [PATCH 03/24] grpc-js: Add weighted_target load balancer --- .../src/load-balancer-weighted-target.ts | 327 ++++++++++++++++++ packages/grpc-js/src/load-balancer.ts | 2 + packages/grpc-js/src/load-balancing-config.ts | 23 +- 3 files changed, 351 insertions(+), 1 deletion(-) create mode 100644 packages/grpc-js/src/load-balancer-weighted-target.ts diff --git a/packages/grpc-js/src/load-balancer-weighted-target.ts b/packages/grpc-js/src/load-balancer-weighted-target.ts new file mode 100644 index 00000000..03d0ec70 --- /dev/null +++ b/packages/grpc-js/src/load-balancer-weighted-target.ts @@ -0,0 +1,327 @@ +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { LoadBalancer, ChannelControlHelper, getFirstUsableConfig, registerLoadBalancerType } from "./load-balancer"; +import { SubchannelAddress } from "./subchannel"; +import { LoadBalancingConfig, WeightedTarget, isWeightedTargetLoadBalancingConfig } from "./load-balancing-config"; +import { Picker, PickResult, PickArgs, QueuePicker, UnavailablePicker } from "./picker"; +import { ConnectivityState } from "./channel"; +import { ChildLoadBalancerHandler } from "./load-balancer-child-handler"; +import { Status } from "./constants"; +import { Metadata } from "./metadata"; + +const TYPE_NAME = 'weighted_target'; + +const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000; + +/* These should be imported from load-balancer-priority.ts or factored into + * a separate file */ + +export type LocalitySubchannelAddress = SubchannelAddress & { + localityPath: string[]; +}; + +export function isLocalitySubchannelAddress( + address: SubchannelAddress +): address is LocalitySubchannelAddress { + return Array.isArray((address as LocalitySubchannelAddress).localityPath); +} + +/** + * Represents a picker and a subinterval of a larger interval used for randomly + * selecting an element of a list of these objects. + */ +interface WeightedPicker { + picker: Picker; + /** + * The exclusive end of the interval associated with this element. The start + * of the interval is implicitly the rangeEnd of the previous element in the + * list, or 0 for the first element in the list. + */ + rangeEnd: number; +} + +class WeightedTargetPicker implements Picker { + private rangeTotal: number; + constructor(private readonly pickerList: WeightedPicker[]) { + this.rangeTotal = pickerList[pickerList.length - 1].rangeEnd; + } + pick(pickArgs: PickArgs): PickResult { + // num | 0 is equivalent to floor(num) + const selection = (Math.random() * this.rangeTotal) | 0; + + /* Binary search for the element of the list such that + * pickerList[index - 1].rangeEnd <= selection < pickerList[index].rangeEnd + */ + let mid = 0; + let startIndex = 0; + let endIndex = this.pickerList.length - 1; + let index = 0; + while (endIndex > startIndex) { + mid = ((startIndex + endIndex) / 2) | 0; + if (this.pickerList[mid].rangeEnd > selection) { + endIndex = mid; + } else if (this.pickerList[mid].rangeEnd < selection) { + startIndex = mid + 1; + } else { + // + 1 here because the range is exclusive at the top end + index = mid + 1; + break; + } + } + if (index === 0) { + index = startIndex; + } + + return this.pickerList[index].picker.pick(pickArgs); + } +} + +interface WeightedChild { + updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void; + exitIdle(): void; + resetBackoff(): void; + destroy(): void; + deactivate(): void; + maybeReactivate(): void; + getConnectivityState(): ConnectivityState; + getPicker(): Picker; + getWeight(): number; +} + +export class WeightedTargetLoadBalancer implements LoadBalancer { + private WeightedChildImpl = class implements WeightedChild { + private connectivityState: ConnectivityState = ConnectivityState.IDLE; + private picker: Picker; + private childBalancer: ChildLoadBalancerHandler; + private deactivationTimer: NodeJS.Timer | null = null; + private weight: number = 0; + + constructor(private parent: WeightedTargetLoadBalancer, private name: string) { + this.childBalancer = new ChildLoadBalancerHandler({ + createSubchannel: (subchannelAddress, subchannelOptions) => { + return this.parent.channelControlHelper.createSubchannel(subchannelAddress, subchannelOptions); + }, + updateState: (connectivityState, picker) => { + this.updateState(connectivityState, picker); + }, + requestReresolution: () => { + this.parent.channelControlHelper.requestReresolution(); + } + }); + + this.picker = new QueuePicker(this.childBalancer); + } + + private updateState(connectivityState: ConnectivityState, picker: Picker) { + this.connectivityState = connectivityState; + this.picker = picker; + this.parent.updateState(); + } + + updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void { + this.weight = lbConfig.weight; + const childConfig = getFirstUsableConfig(lbConfig.child_policy); + if (childConfig !== null) { + this.childBalancer.updateAddressList(addressList, childConfig, attributes); + } + } + exitIdle(): void { + this.childBalancer.exitIdle(); + } + resetBackoff(): void { + this.childBalancer.resetBackoff(); + } + destroy(): void { + this.childBalancer.destroy(); + if (this.deactivationTimer !== null) { + clearTimeout(this.deactivationTimer); + } + } + deactivate(): void { + if (this.deactivationTimer === null) { + this.deactivationTimer = setTimeout(() => { + this.parent.targets.delete(this.name); + this.deactivationTimer = null; + }, DEFAULT_RETENTION_INTERVAL_MS); + } + } + maybeReactivate(): void { + if (this.deactivationTimer !== null) { + clearTimeout(this.deactivationTimer); + this.deactivationTimer = null; + } + } + getConnectivityState(): ConnectivityState { + return this.connectivityState; + } + getPicker(): Picker { + return this.picker; + } + getWeight(): number { + return this.weight; + } + } + // end of WeightedChildImpl + + /** + * Map of target names to target children. Includes current targets and + * previous targets with deactivation timers that have not yet triggered. + */ + private targets: Map = new Map(); + /** + * List of current target names. + */ + private targetList: string[] = []; + + constructor(private channelControlHelper: ChannelControlHelper) {} + + private updateState() { + const pickerList: WeightedPicker[] = []; + let end = 0; + + let connectingCount = 0; + let idleCount = 0; + let transientFailureCount = 0; + for (const targetName of this.targetList) { + const target = this.targets.get(targetName); + if (target === undefined) { + continue; + } + switch (target.getConnectivityState()) { + case ConnectivityState.READY: + end += target.getWeight(); + pickerList.push({ + picker: target.getPicker(), + rangeEnd: end + }); + break; + case ConnectivityState.CONNECTING: + connectingCount += 1; + break; + case ConnectivityState.IDLE: + idleCount += 1; + break; + case ConnectivityState.TRANSIENT_FAILURE: + transientFailureCount += 1; + break; + default: + // Ignore the other possiblity, SHUTDOWN + } + } + + let connectivityState: ConnectivityState; + if (pickerList.length > 0) { + connectivityState = ConnectivityState.READY; + } else if (connectingCount > 0) { + connectivityState = ConnectivityState.CONNECTING; + } else if (idleCount > 0) { + connectivityState = ConnectivityState.IDLE; + } else { + connectivityState = ConnectivityState.TRANSIENT_FAILURE; + } + + let picker: Picker; + switch (connectivityState) { + case ConnectivityState.READY: + picker = new WeightedTargetPicker(pickerList); + break; + case ConnectivityState.CONNECTING: + case ConnectivityState.READY: + picker = new QueuePicker(this); + break; + default: + picker = new UnavailablePicker({ + code: Status.UNAVAILABLE, + details: 'weighted_target: all children report state TRANSIENT_FAILURE', + metadata: new Metadata() + }); + } + this.channelControlHelper.updateState(connectivityState, picker); + } + + updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + if (!isWeightedTargetLoadBalancingConfig(lbConfig)) { + // Reject a config of the wrong type + return; + } + + /* For each address, the first element of its localityPath array determines + * which child it belongs to. So we bucket those addresses by that first + * element, and pass along the rest of the localityPath for that child + * to use. */ + const childAddressMap = new Map(); + for (const address of addressList) { + if (!isLocalitySubchannelAddress(address)) { + // Reject address that cannot be associated with targets + return; + } + if (address.localityPath.length < 1) { + // Reject address that cannot be associated with targets + return; + } + const childName = address.localityPath[0]; + const childAddress: LocalitySubchannelAddress = { + ...address, + localityPath: address.localityPath.slice(1), + }; + let childAddressList = childAddressMap.get(childName); + if (childAddressList === undefined) { + childAddressList = []; + childAddressMap.set(childName, childAddressList); + } + childAddressList.push(childAddress); + } + + this.targetList = Array.from(lbConfig.weighted_target.targets.keys()); + for (const [targetName, targetConfig] of lbConfig.weighted_target.targets) { + let target = this.targets.get(targetName); + if (target === undefined) { + target = new this.WeightedChildImpl(this, targetName); + this.targets.set(targetName, target); + } else { + target.maybeReactivate(); + } + target.updateAddressList(childAddressMap.get(targetName) ?? [], targetConfig, attributes); + } + + this.updateState(); + } + exitIdle(): void { + for (const targetName of this.targetList) { + this.targets.get(targetName)?.exitIdle(); + } + } + resetBackoff(): void { + for (const targetName of this.targetList) { + this.targets.get(targetName)?.resetBackoff(); + } + } + destroy(): void { + for (const target of this.targets.values()) { + target.destroy(); + } + this.targets.clear(); + } + getTypeName(): string { + return TYPE_NAME; + } +} + +export function setup() { + registerLoadBalancerType(TYPE_NAME, WeightedTargetLoadBalancer); +} \ No newline at end of file diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index dc4cf4e3..9bf50f38 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -22,6 +22,7 @@ import { Picker } from './picker'; import { LoadBalancingConfig } from './load-balancing-config'; import * as load_balancer_pick_first from './load-balancer-pick-first'; import * as load_balancer_round_robin from './load-balancer-round-robin'; +import * as load_balancer_weighted_target from './load-balancer-weighted-target'; /** * A collection of functions associated with a channel that a load balancer @@ -137,4 +138,5 @@ export function getFirstUsableConfig( export function registerAll() { load_balancer_pick_first.setup(); load_balancer_round_robin.setup(); + load_balancer_weighted_target.setup(); } diff --git a/packages/grpc-js/src/load-balancing-config.ts b/packages/grpc-js/src/load-balancing-config.ts index 0a06d5f8..736e3d42 100644 --- a/packages/grpc-js/src/load-balancing-config.ts +++ b/packages/grpc-js/src/load-balancing-config.ts @@ -48,6 +48,15 @@ export interface PriorityLbConfig { priorities: string[]; } +export interface WeightedTarget { + weight: number; + child_policy: LoadBalancingConfig[]; +} + +export interface WeightedTargetLbConfig { + targets: Map; +} + export interface PickFirstLoadBalancingConfig { name: 'pick_first'; pick_first: PickFirstConfig; @@ -73,12 +82,18 @@ export interface PriorityLoadBalancingConfig { priority: PriorityLbConfig; } +export interface WeightedTargetLoadBalancingConfig { + name: 'weighted_target'; + weighted_target: WeightedTargetLbConfig; +} + export type LoadBalancingConfig = | PickFirstLoadBalancingConfig | RoundRobinLoadBalancingConfig | XdsLoadBalancingConfig | GrpcLbLoadBalancingConfig - | PriorityLoadBalancingConfig; + | PriorityLoadBalancingConfig + | WeightedTargetLoadBalancingConfig; export function isRoundRobinLoadBalancingConfig( lbconfig: LoadBalancingConfig @@ -104,6 +119,12 @@ export function isPriorityLoadBalancingConfig( return lbconfig.name === 'priority'; } +export function isWeightedTargetLoadBalancingConfig( + lbconfig: LoadBalancingConfig +): lbconfig is WeightedTargetLoadBalancingConfig { + return lbconfig.name === 'weighted_target'; +} + /* In these functions we assume the input came from a JSON object. Therefore we * expect that the prototype is uninteresting and that `in` can be used * effectively */ From a61dfb1527d028c6cf14114fe29ff10b3fa20eb4 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 28 May 2020 11:52:02 -0700 Subject: [PATCH 04/24] Some cleanup and fixes --- packages/grpc-js/src/load-balancer-priority.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/packages/grpc-js/src/load-balancer-priority.ts b/packages/grpc-js/src/load-balancer-priority.ts index 94a4ace7..d3070715 100644 --- a/packages/grpc-js/src/load-balancer-priority.ts +++ b/packages/grpc-js/src/load-balancer-priority.ts @@ -100,9 +100,6 @@ export class PriorityLoadBalancer implements LoadBalancer { }, }); this.picker = new QueuePicker(this.childBalancer); - - this.deactivationTimer = setTimeout(() => {}, 0); - clearTimeout(this.deactivationTimer); } private updateState(connectivityState: ConnectivityState, picker: Picker) { @@ -415,7 +412,7 @@ export class PriorityLoadBalancer implements LoadBalancer { this.priorities = priorityConfig.priorities; /* Pair up the new child configs with the corresponding address lists, and * update all existing children with their new configs */ - for (const [childName, childConfig] of priorityConfig.children.entries()) { + for (const [childName, childConfig] of priorityConfig.children) { const chosenChildConfig = getFirstUsableConfig(childConfig.config); if (chosenChildConfig !== null) { const childAddresses = childAddressMap.get(childName) ?? []; @@ -434,7 +431,7 @@ export class PriorityLoadBalancer implements LoadBalancer { } } // Deactivate all children that are no longer in the priority list - for (const [childName, child] of this.children.entries()) { + for (const [childName, child] of this.children) { if (this.priorities.indexOf(childName) < 0) { child.deactivate(); } From 4f38f1e92fd08687802f0b59f244576b07eb1d3a Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 29 May 2020 12:35:25 -0700 Subject: [PATCH 05/24] Deactivate targets that are not in new configs --- packages/grpc-js/src/load-balancer-weighted-target.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/grpc-js/src/load-balancer-weighted-target.ts b/packages/grpc-js/src/load-balancer-weighted-target.ts index 03d0ec70..e9756e6a 100644 --- a/packages/grpc-js/src/load-balancer-weighted-target.ts +++ b/packages/grpc-js/src/load-balancer-weighted-target.ts @@ -299,6 +299,13 @@ export class WeightedTargetLoadBalancer implements LoadBalancer { target.updateAddressList(childAddressMap.get(targetName) ?? [], targetConfig, attributes); } + // Deactivate targets that are not in the new config + for (const [targetName, target] of this.targets) { + if (this.targetList.indexOf(targetName) < 0) { + target.deactivate(); + } + } + this.updateState(); } exitIdle(): void { From 6701f19f5e1d16e59202b9dd32b2e5b974f062d6 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 8 Jun 2020 14:05:10 -0700 Subject: [PATCH 06/24] grpc-js: Consistently set servername connection option to support SNI --- packages/grpc-js/src/subchannel.ts | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 1d48604a..bda63b6b 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -28,7 +28,7 @@ import * as logging from './logging'; import { LogVerbosity } from './constants'; import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; import * as net from 'net'; -import { GrpcUri } from './uri-parser'; +import { GrpcUri, parseUri } from './uri-parser'; import { ConnectionOptions } from 'tls'; import { FilterFactory, Filter } from './filter'; @@ -286,6 +286,9 @@ export class Subchannel { } private createSession(proxyConnectionResult: ProxyConnectionResult) { + const targetAuthority = getDefaultAuthority( + proxyConnectionResult.realTarget ?? this.channelTarget + ); let connectionOptions: http2.SecureClientSessionOptions = this.credentials._getConnectionOptions() || {}; let addressScheme = 'http://'; @@ -305,8 +308,16 @@ export class Subchannel { return checkServerIdentity(sslTargetNameOverride, cert); }; connectionOptions.servername = sslTargetNameOverride; + } else { + // We want to always set servername to support SNI + connectionOptions.servername = targetAuthority; } if (proxyConnectionResult.socket) { + /* This is part of the workaround for + * https://github.com/nodejs/node/issues/32922. Without that bug, + * proxyConnectionResult.socket would always be a plaintext socket and + * this would say + * connectionOptions.socket = proxyConnectionResult.socket; */ connectionOptions.createConnection = (authority, option) => { return proxyConnectionResult.socket!; }; @@ -350,10 +361,7 @@ export class Subchannel { * determines whether the connection will be established over TLS or not. */ const session = http2.connect( - addressScheme + - getDefaultAuthority( - proxyConnectionResult.realTarget ?? this.channelTarget - ), + addressScheme + targetAuthority, connectionOptions ); this.session = session; @@ -446,6 +454,18 @@ export class Subchannel { return checkServerIdentity(sslTargetNameOverride, cert); }; connectionOptions.servername = sslTargetNameOverride; + } else { + if ('grpc.http_connect_target' in this.options) { + /* This is more or less how servername will be set in createSession + * if a connection is successfully established through the proxy. + * If the proxy is not used, these connectionOptions are discarded + * anyway */ + connectionOptions.servername = getDefaultAuthority( + parseUri(this.options['grpc.http_connect_target'] as string) ?? { + path: 'localhost', + } + ); + } } } From cb9f96126f2c8f720498a2488af991569a7655a0 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 8 Jun 2020 14:44:14 -0700 Subject: [PATCH 07/24] grpc-js: server: cull closed sessions from list, check for closed in tryShutdown --- packages/grpc-js/src/server.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index a6edee3b..22fb3c72 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -500,10 +500,12 @@ export class Server { } } - // If any sessions are active, close them gracefully. pendingChecks += this.sessions.size; this.sessions.forEach((session) => { - session.close(maybeCallback); + if (!session.closed) { + pendingChecks += 1; + session.close(maybeCallback); + } }); if (pendingChecks === 0) { callback(); @@ -608,6 +610,10 @@ export class Server { } this.sessions.add(session); + + session.on('close', () => { + this.sessions.delete(session); + }); }); } } From d9b7b098a7eb3989a23c6782f327230f566cd57f Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 8 Jun 2020 16:36:12 -0700 Subject: [PATCH 08/24] grpc-js: Export propagate constants for type parity with grpc --- packages/grpc-js/src/constants.ts | 14 +++++++++++++- packages/grpc-js/src/index.ts | 3 ++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js/src/constants.ts b/packages/grpc-js/src/constants.ts index 55d93403..e760658d 100644 --- a/packages/grpc-js/src/constants.ts +++ b/packages/grpc-js/src/constants.ts @@ -41,8 +41,20 @@ export enum LogVerbosity { ERROR, } +/** + * NOTE: This enum is not currently used in any implemented API in this + * library. It is included only for type parity with the other implementation. + */ +export enum Propagate { + DEADLINE = 1, + CENSUS_STATS_CONTEXT = 2, + CENSUS_TRACING_CONTEXT = 4, + CANCELLATION = 8, + DEFAULTS = 65536, +} + // -1 means unlimited export const DEFAULT_MAX_SEND_MESSAGE_LENGTH = -1; // 4 MB default -export const DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 4 * 1024 * 1024; \ No newline at end of file +export const DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 4 * 1024 * 1024; diff --git a/packages/grpc-js/src/index.ts b/packages/grpc-js/src/index.ts index 9fc89f2e..f79a369b 100644 --- a/packages/grpc-js/src/index.ts +++ b/packages/grpc-js/src/index.ts @@ -36,7 +36,7 @@ import { CallProperties, UnaryCallback, } from './client'; -import { LogVerbosity, Status } from './constants'; +import { LogVerbosity, Status, Propagate } from './constants'; import * as logging from './logging'; import { Deserialize, @@ -127,6 +127,7 @@ export { LogVerbosity as logVerbosity, Status as status, ConnectivityState as connectivityState, + Propagate as propagate, // TODO: Other constants as well }; From 668b5aeb5a9505267b471ca7712d6ab344f7671a Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 9 Jun 2020 10:40:53 -0700 Subject: [PATCH 09/24] Consolidate LocalitySubchannelAddress definitions --- .../grpc-js/src/load-balancer-weighted-target.ts | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/packages/grpc-js/src/load-balancer-weighted-target.ts b/packages/grpc-js/src/load-balancer-weighted-target.ts index e9756e6a..04145914 100644 --- a/packages/grpc-js/src/load-balancer-weighted-target.ts +++ b/packages/grpc-js/src/load-balancer-weighted-target.ts @@ -23,24 +23,12 @@ import { ConnectivityState } from "./channel"; import { ChildLoadBalancerHandler } from "./load-balancer-child-handler"; import { Status } from "./constants"; import { Metadata } from "./metadata"; +import { isLocalitySubchannelAddress, LocalitySubchannelAddress } from "./load-balancer-priority"; const TYPE_NAME = 'weighted_target'; const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000; -/* These should be imported from load-balancer-priority.ts or factored into - * a separate file */ - -export type LocalitySubchannelAddress = SubchannelAddress & { - localityPath: string[]; -}; - -export function isLocalitySubchannelAddress( - address: SubchannelAddress -): address is LocalitySubchannelAddress { - return Array.isArray((address as LocalitySubchannelAddress).localityPath); -} - /** * Represents a picker and a subinterval of a larger interval used for randomly * selecting an element of a list of these objects. From f4853c13f7efe7a543c9242b3cbb555e30114960 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 9 Jun 2020 11:07:20 -0700 Subject: [PATCH 10/24] Don't double count sessions when closing --- packages/grpc-js/src/server.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 22fb3c72..ebd4504d 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -500,7 +500,6 @@ export class Server { } } - pendingChecks += this.sessions.size; this.sessions.forEach((session) => { if (!session.closed) { pendingChecks += 1; From e8f7fb5cbd6dfc08393e87e1aba4223e544762e4 Mon Sep 17 00:00:00 2001 From: Ben Sykes Date: Mon, 15 Jun 2020 09:18:44 -0700 Subject: [PATCH 11/24] Update node-pre-gyp to pickup fix for #1362 node-pre-gyp 0.12.0 uses needle 2.4.1 which has the bug in it. Even with grpc 1.24.3, which refers to the updated version, it seems npm can decide to use the older version referenced by this package. --- packages/grpc-tools/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-tools/package.json b/packages/grpc-tools/package.json index c7bb2794..67f6af4d 100644 --- a/packages/grpc-tools/package.json +++ b/packages/grpc-tools/package.json @@ -24,7 +24,7 @@ "prepublishOnly": "git submodule update --init --recursive && node copy_well_known_protos.js" }, "dependencies": { - "node-pre-gyp": "^0.12.0" + "node-pre-gyp": "^0.15.0" }, "binary": { "module_name": "grpc_tools", From 01dbc34eb1ed96438c6d30236c6341aae1d0e708 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 16 Jun 2020 11:30:51 -0700 Subject: [PATCH 12/24] grpc-js: Add port to :authority, leave it out of service_url --- packages/grpc-js/src/call-credentials-filter.ts | 4 +++- packages/grpc-js/src/resolver-dns.ts | 7 +------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/packages/grpc-js/src/call-credentials-filter.ts b/packages/grpc-js/src/call-credentials-filter.ts index d39af832..a5459b53 100644 --- a/packages/grpc-js/src/call-credentials-filter.ts +++ b/packages/grpc-js/src/call-credentials-filter.ts @@ -20,6 +20,7 @@ import { Channel } from './channel'; import { BaseFilter, Filter, FilterFactory } from './filter'; import { Metadata } from './metadata'; import { Status } from './constants'; +import { splitHostPort } from './uri-parser'; export class CallCredentialsFilter extends BaseFilter implements Filter { private serviceUrl: string; @@ -38,9 +39,10 @@ export class CallCredentialsFilter extends BaseFilter implements Filter { if (splitPath.length >= 2) { serviceName = splitPath[1]; } + const hostname = splitHostPort(stream.getHost()).host; /* Currently, call credentials are only allowed on HTTPS connections, so we * can assume that the scheme is "https" */ - this.serviceUrl = `https://${stream.getHost()}/${serviceName}`; + this.serviceUrl = `https://${hostname}/${serviceName}`; } async sendMetadata(metadata: Promise): Promise { diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index 016f3bd1..8a7f4f22 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -269,12 +269,7 @@ class DnsResolver implements Resolver { * @param target */ static getDefaultAuthority(target: GrpcUri): string { - const hostPort = splitHostPort(target.path); - if (hostPort !== null) { - return hostPort.host; - } else { - throw new Error(`Failed to parse target ${uriToString(target)}`); - } + return target.path; } } From f97e27f0c05f325d18d86115d3998cc3755bc7df Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 16 Jun 2020 11:40:22 -0700 Subject: [PATCH 13/24] Fix possible null reference --- packages/grpc-js/src/call-credentials-filter.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/src/call-credentials-filter.ts b/packages/grpc-js/src/call-credentials-filter.ts index a5459b53..5263d97f 100644 --- a/packages/grpc-js/src/call-credentials-filter.ts +++ b/packages/grpc-js/src/call-credentials-filter.ts @@ -39,7 +39,7 @@ export class CallCredentialsFilter extends BaseFilter implements Filter { if (splitPath.length >= 2) { serviceName = splitPath[1]; } - const hostname = splitHostPort(stream.getHost()).host; + const hostname = splitHostPort(stream.getHost())?.host ?? 'localhost'; /* Currently, call credentials are only allowed on HTTPS connections, so we * can assume that the scheme is "https" */ this.serviceUrl = `https://${hostname}/${serviceName}`; From ece7d0f56d91903fcf935cdc64eecb5ff5ce11df Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 16 Jun 2020 13:36:20 -0700 Subject: [PATCH 14/24] grpc-js: Don't initiate a read after receiving a message --- packages/grpc-js/src/client.ts | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index 8ebb2537..5f78ffe5 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -558,9 +558,7 @@ export class Client { }, // eslint-disable-next-line @typescript-eslint/no-explicit-any onReceiveMessage(message: any) { - if (stream.push(message)) { - call.startRead(); - } + stream.push(message); }, onReceiveStatus(status: StatusObject) { if (receivedStatus) { @@ -656,9 +654,7 @@ export class Client { stream.emit('metadata', metadata); }, onReceiveMessage(message: Buffer) { - if (stream.push(message)) { - call.startRead(); - } + stream.push(message) }, onReceiveStatus(status: StatusObject) { if (receivedStatus) { From b825055163aabe7658a5f4436f2cda369ce30c57 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 16 Jun 2020 13:45:09 -0700 Subject: [PATCH 15/24] Add test for long stream --- test/api/interop_extra_test.js | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/test/api/interop_extra_test.js b/test/api/interop_extra_test.js index b798f1a6..d97340bf 100644 --- a/test/api/interop_extra_test.js +++ b/test/api/interop_extra_test.js @@ -144,6 +144,28 @@ describe(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, functio done(); }); }); + it('should receive all messages in a long stream', function() { + var arg = { + response_type: 'COMPRESSABLE', + response_parameters: [ + ] + }; + for (let i = 0; i < 100_000; i++) { + arg.response_parameters.push({size: 0}); + } + var call = client.streamingOutputCall(arg); + let responseCount = 0; + call.on('data', (value) => { + responseCount++; + }); + call.on('end', () => { + assert.strictEqual(responseCount, arg.response_parameters.length); + done(); + }); + call.on('status', function(status) { + assert.strictEqual(status.code, grpc.status.OK); + }); + }); describe('max message size', function() { // A size that is larger than the default limit const largeMessageSize = 8 * 1024 * 1024; From 8a4a9b3235a0dcab241bf35bb53ef64820b89f60 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 16 Jun 2020 14:05:58 -0700 Subject: [PATCH 16/24] Underscore in numbers is too new for some Node versions --- test/api/interop_extra_test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/api/interop_extra_test.js b/test/api/interop_extra_test.js index d97340bf..1f31fb41 100644 --- a/test/api/interop_extra_test.js +++ b/test/api/interop_extra_test.js @@ -150,7 +150,7 @@ describe(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, functio response_parameters: [ ] }; - for (let i = 0; i < 100_000; i++) { + for (let i = 0; i < 100000; i++) { arg.response_parameters.push({size: 0}); } var call = client.streamingOutputCall(arg); From 68bc74d0bd11d90bde1a8fff72d411aed699bbad Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 16 Jun 2020 15:02:08 -0700 Subject: [PATCH 17/24] Rearrange new test slightly --- test/api/interop_extra_test.js | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/api/interop_extra_test.js b/test/api/interop_extra_test.js index 1f31fb41..427c93f5 100644 --- a/test/api/interop_extra_test.js +++ b/test/api/interop_extra_test.js @@ -158,12 +158,13 @@ describe(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, functio call.on('data', (value) => { responseCount++; }); - call.on('end', () => { + call.on('status', (status) => { + assert.strictEqual(status.code, grpc.status.OK); assert.strictEqual(responseCount, arg.response_parameters.length); done(); }); - call.on('status', function(status) { - assert.strictEqual(status.code, grpc.status.OK); + call.on('error', (error) => { + assert.ifError(error); }); }); describe('max message size', function() { From f50ed7c2236ccadc36ae699b1a00b99de0fe6d38 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 17 Jun 2020 09:33:16 -0700 Subject: [PATCH 18/24] Make the new test actually pass --- test/api/interop_extra_test.js | 5 +++-- test/interop/async_delay_queue.js | 8 ++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/test/api/interop_extra_test.js b/test/api/interop_extra_test.js index 427c93f5..f94b0752 100644 --- a/test/api/interop_extra_test.js +++ b/test/api/interop_extra_test.js @@ -51,7 +51,7 @@ function echoMetadataGenerator(options, callback) { const credentials = grpc.credentials.createFromMetadataGenerator(echoMetadataGenerator); -describe(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, function() { +describe.only(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, function() { describe('Interop-adjacent tests', function() { let server; let client; @@ -144,7 +144,8 @@ describe(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, functio done(); }); }); - it('should receive all messages in a long stream', function() { + it('should receive all messages in a long stream', function(done) { + this.timeout(20000); var arg = { response_type: 'COMPRESSABLE', response_parameters: [ diff --git a/test/interop/async_delay_queue.js b/test/interop/async_delay_queue.js index 43ac5738..f9a39b59 100644 --- a/test/interop/async_delay_queue.js +++ b/test/interop/async_delay_queue.js @@ -39,9 +39,13 @@ AsyncDelayQueue.prototype.runNext = function() { var continueCallback = _.bind(this.runNext, this); if (next) { this.callback_pending = true; - setTimeout(function() { + if (next.delay === 0) { next.callback(continueCallback); - }, next.delay); + } else { + setTimeout(function() { + next.callback(continueCallback); + }, next.delay); + } } else { this.callback_pending = false; } From 25dfe88fb34222d9856c3f76ae6055560a873b95 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 17 Jun 2020 11:25:26 -0700 Subject: [PATCH 19/24] grpc-js: bump to 1.1.0 --- packages/grpc-js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 3d17f9a3..1a242012 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.0.5", + "version": "1.1.0", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", From a34973b469f5a7ac6bd6e37a8fc12efe9d8c1faf Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 17 Jun 2020 11:32:48 -0700 Subject: [PATCH 20/24] Remove 'only' that was left over from test fixes --- test/api/interop_extra_test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/api/interop_extra_test.js b/test/api/interop_extra_test.js index f94b0752..67130e03 100644 --- a/test/api/interop_extra_test.js +++ b/test/api/interop_extra_test.js @@ -51,7 +51,7 @@ function echoMetadataGenerator(options, callback) { const credentials = grpc.credentials.createFromMetadataGenerator(echoMetadataGenerator); -describe.only(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, function() { +describe(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, function() { describe('Interop-adjacent tests', function() { let server; let client; From cd8743e5699308973e7f8c41218e13772b1f14fb Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 19 Jun 2020 08:56:48 -0700 Subject: [PATCH 21/24] Omit port number from servername option --- packages/grpc-js/package.json | 2 +- packages/grpc-js/src/subchannel.ts | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 1a242012..da8e28ae 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.1.0", + "version": "1.1.1", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index bda63b6b..bbaa1ce1 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -28,7 +28,7 @@ import * as logging from './logging'; import { LogVerbosity } from './constants'; import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; import * as net from 'net'; -import { GrpcUri, parseUri } from './uri-parser'; +import { GrpcUri, parseUri, splitHostPort } from './uri-parser'; import { ConnectionOptions } from 'tls'; import { FilterFactory, Filter } from './filter'; @@ -309,8 +309,9 @@ export class Subchannel { }; connectionOptions.servername = sslTargetNameOverride; } else { + const authorityHostname = splitHostPort(targetAuthority)?.host ?? 'localhost'; // We want to always set servername to support SNI - connectionOptions.servername = targetAuthority; + connectionOptions.servername = authorityHostname; } if (proxyConnectionResult.socket) { /* This is part of the workaround for From 46c84bdb4ef87969e370ff9b39c542a78b58d746 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 7 Jul 2020 10:51:42 -0700 Subject: [PATCH 22/24] grpc-js: Improve error handling in a few places --- packages/grpc-js/package.json | 2 +- packages/grpc-js/src/call-stream.ts | 9 +++++++++ packages/grpc-js/src/subchannel.ts | 27 +++++++++++++++++++++++++-- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index da8e28ae..27aa679c 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.1.1", + "version": "1.1.2", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 041f9651..825342a5 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -541,6 +541,15 @@ export class Http2CallStream implements Call { code = Status.PERMISSION_DENIED; details = 'Protocol not secure enough'; break; + case http2.constants.NGHTTP2_INTERNAL_ERROR: + code = Status.INTERNAL; + /* This error code was previously handled in the default case, and + * there are several instances of it online, so I wanted to + * preserve the original error message so that people find existing + * information in searches, but also include the more recognizable + * "Internal server error" message. */ + details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`; + break; default: code = Status.INTERNAL; details = `Received RST_STREAM with code ${stream.rstCode}`; diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index bbaa1ce1..be72680e 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -309,7 +309,8 @@ export class Subchannel { }; connectionOptions.servername = sslTargetNameOverride; } else { - const authorityHostname = splitHostPort(targetAuthority)?.host ?? 'localhost'; + const authorityHostname = + splitHostPort(targetAuthority)?.host ?? 'localhost'; // We want to always set servername to support SNI connectionOptions.servername = authorityHostname; } @@ -413,6 +414,11 @@ export class Subchannel { KEEPALIVE_MAX_TIME_MS ); } + trace( + this.subchannelAddress + + ' connection closed by GOAWAY with code ' + + errorCode + ); this.transitionToState( [ConnectivityState.CONNECTING, ConnectivityState.READY], ConnectivityState.IDLE @@ -661,7 +667,24 @@ export class Subchannel { headers[HTTP2_HEADER_METHOD] = 'POST'; headers[HTTP2_HEADER_PATH] = callStream.getMethod(); headers[HTTP2_HEADER_TE] = 'trailers'; - const http2Stream = this.session!.request(headers); + let http2Stream: http2.ClientHttp2Stream; + /* In theory, if an error is thrown by session.request because session has + * become unusable (e.g. because it has received a goaway), this subchannel + * should soon see the corresponding close or goaway event anyway and leave + * READY. But we have seen reports that this does not happen + * (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096) + * so for defense in depth, we just discard the session when we see an + * error here. + */ + try { + http2Stream = this.session!.request(headers); + } catch (e) { + this.transitionToState( + [ConnectivityState.READY], + ConnectivityState.TRANSIENT_FAILURE + ); + throw e; + } let headersString = ''; for (const header of Object.keys(headers)) { headersString += '\t\t' + header + ': ' + headers[header] + '\n'; From 7cf93591ca278b6cad78d9133fa650480c8ccd60 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 9 Jul 2020 09:57:00 -0700 Subject: [PATCH 23/24] grpc-js: Remove peerDependency on google-auth-library --- packages/grpc-js/package.json | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index da8e28ae..8a60a647 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -59,9 +59,6 @@ "dependencies": { "semver": "^6.2.0" }, - "peerDependencies": { - "google-auth-library": "5.x || 6.x" - }, "files": [ "src/*.ts", "build/src/*.{js,d.ts,js.map}", From cee9a455a6f2b4dfab02e55d86a3ec0dc595488a Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 10 Jul 2020 09:04:52 -0700 Subject: [PATCH 24/24] Load google/protobuf/* into common using require --- packages/proto-loader/package.json | 2 +- packages/proto-loader/src/index.ts | 42 +++++++++++-------- .../proto-loader/test_protos/well_known.proto | 4 ++ 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/packages/proto-loader/package.json b/packages/proto-loader/package.json index 022eb41c..cba96b43 100644 --- a/packages/proto-loader/package.json +++ b/packages/proto-loader/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/proto-loader", - "version": "0.5.4", + "version": "0.5.5", "author": "Google Inc.", "contributors": [ { diff --git a/packages/proto-loader/src/index.ts b/packages/proto-loader/src/index.ts index b29e26f3..ffef89d3 100644 --- a/packages/proto-loader/src/index.ts +++ b/packages/proto-loader/src/index.ts @@ -369,23 +369,29 @@ export function loadSync( } // Load Google's well-known proto files that aren't exposed by Protobuf.js. -{ - // Protobuf.js exposes: any, duration, empty, field_mask, struct, timestamp, - // and wrappers. compiler/plugin is excluded in Protobuf.js and here. - const wellKnownProtos = ['api', 'descriptor', 'source_context', 'type']; - const sourceDir = path.join( - path.dirname(require.resolve('protobufjs')), - 'google', - 'protobuf' - ); - for (const proto of wellKnownProtos) { - const file = path.join(sourceDir, `${proto}.proto`); - const descriptor = Protobuf.loadSync(file).toJSON(); +// Protobuf.js exposes: any, duration, empty, field_mask, struct, timestamp, +// and wrappers. compiler/plugin is excluded in Protobuf.js and here. - Protobuf.common( - proto, - (descriptor.nested!.google as Protobuf.INamespace).nested! - ); - } -} +// Using constant strings for compatibility with tools like Webpack +const apiDescriptor = require('protobufjs/google/protobuf/api.json'); +const descriptorDescriptor = require('protobufjs/google/protobuf/descriptor.json'); +const sourceContextDescriptor = require('protobufjs/google/protobuf/source_context.json'); +const typeDescriptor = require('protobufjs/google/protobuf/type.json'); + +Protobuf.common( + 'api', + apiDescriptor.nested.google.nested.protobuf.nested +); +Protobuf.common( + 'descriptor', + descriptorDescriptor.nested.google.nested.protobuf.nested +); +Protobuf.common( + 'source_context', + sourceContextDescriptor.nested.google.nested.protobuf.nested +); +Protobuf.common( + 'type', + typeDescriptor.nested.google.nested.protobuf.nested +); diff --git a/packages/proto-loader/test_protos/well_known.proto b/packages/proto-loader/test_protos/well_known.proto index dd70402b..3ff2292f 100644 --- a/packages/proto-loader/test_protos/well_known.proto +++ b/packages/proto-loader/test_protos/well_known.proto @@ -19,3 +19,7 @@ import "google/protobuf/descriptor.proto"; extend google.protobuf.FieldOptions { bool redact = 52000; } + +message DescriptorHolder { + google.protobuf.DescriptorProto descriptor = 1; +}