diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 310845e7..1f28e324 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.0.5", + "version": "1.1.2", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", @@ -60,9 +60,6 @@ "dependencies": { "semver": "^6.2.0" }, - "peerDependencies": { - "google-auth-library": "5.x || 6.x" - }, "files": [ "src/**/*.ts", "build/src/*.{js,d.ts,js.map}", diff --git a/packages/grpc-js/src/call-credentials-filter.ts b/packages/grpc-js/src/call-credentials-filter.ts index d39af832..5263d97f 100644 --- a/packages/grpc-js/src/call-credentials-filter.ts +++ b/packages/grpc-js/src/call-credentials-filter.ts @@ -20,6 +20,7 @@ import { Channel } from './channel'; import { BaseFilter, Filter, FilterFactory } from './filter'; import { Metadata } from './metadata'; import { Status } from './constants'; +import { splitHostPort } from './uri-parser'; export class CallCredentialsFilter extends BaseFilter implements Filter { private serviceUrl: string; @@ -38,9 +39,10 @@ export class CallCredentialsFilter extends BaseFilter implements Filter { if (splitPath.length >= 2) { serviceName = splitPath[1]; } + const hostname = splitHostPort(stream.getHost())?.host ?? 'localhost'; /* Currently, call credentials are only allowed on HTTPS connections, so we * can assume that the scheme is "https" */ - this.serviceUrl = `https://${stream.getHost()}/${serviceName}`; + this.serviceUrl = `https://${hostname}/${serviceName}`; } async sendMetadata(metadata: Promise): Promise { diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 041f9651..825342a5 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -541,6 +541,15 @@ export class Http2CallStream implements Call { code = Status.PERMISSION_DENIED; details = 'Protocol not secure enough'; break; + case http2.constants.NGHTTP2_INTERNAL_ERROR: + code = Status.INTERNAL; + /* This error code was previously handled in the default case, and + * there are several instances of it online, so I wanted to + * preserve the original error message so that people find existing + * information in searches, but also include the more recognizable + * "Internal server error" message. */ + details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`; + break; default: code = Status.INTERNAL; details = `Received RST_STREAM with code ${stream.rstCode}`; diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index 8ebb2537..5f78ffe5 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -558,9 +558,7 @@ export class Client { }, // eslint-disable-next-line @typescript-eslint/no-explicit-any onReceiveMessage(message: any) { - if (stream.push(message)) { - call.startRead(); - } + stream.push(message); }, onReceiveStatus(status: StatusObject) { if (receivedStatus) { @@ -656,9 +654,7 @@ export class Client { stream.emit('metadata', metadata); }, onReceiveMessage(message: Buffer) { - if (stream.push(message)) { - call.startRead(); - } + stream.push(message) }, onReceiveStatus(status: StatusObject) { if (receivedStatus) { diff --git a/packages/grpc-js/src/constants.ts b/packages/grpc-js/src/constants.ts index 55d93403..e760658d 100644 --- a/packages/grpc-js/src/constants.ts +++ b/packages/grpc-js/src/constants.ts @@ -41,8 +41,20 @@ export enum LogVerbosity { ERROR, } +/** + * NOTE: This enum is not currently used in any implemented API in this + * library. It is included only for type parity with the other implementation. + */ +export enum Propagate { + DEADLINE = 1, + CENSUS_STATS_CONTEXT = 2, + CENSUS_TRACING_CONTEXT = 4, + CANCELLATION = 8, + DEFAULTS = 65536, +} + // -1 means unlimited export const DEFAULT_MAX_SEND_MESSAGE_LENGTH = -1; // 4 MB default -export const DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 4 * 1024 * 1024; \ No newline at end of file +export const DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 4 * 1024 * 1024; diff --git a/packages/grpc-js/src/index.ts b/packages/grpc-js/src/index.ts index 9fc89f2e..f79a369b 100644 --- a/packages/grpc-js/src/index.ts +++ b/packages/grpc-js/src/index.ts @@ -36,7 +36,7 @@ import { CallProperties, UnaryCallback, } from './client'; -import { LogVerbosity, Status } from './constants'; +import { LogVerbosity, Status, Propagate } from './constants'; import * as logging from './logging'; import { Deserialize, @@ -127,6 +127,7 @@ export { LogVerbosity as logVerbosity, Status as status, ConnectivityState as connectivityState, + Propagate as propagate, // TODO: Other constants as well }; diff --git a/packages/grpc-js/src/load-balancer-priority.ts b/packages/grpc-js/src/load-balancer-priority.ts new file mode 100644 index 00000000..d3070715 --- /dev/null +++ b/packages/grpc-js/src/load-balancer-priority.ts @@ -0,0 +1,467 @@ +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { + LoadBalancer, + ChannelControlHelper, + getFirstUsableConfig, + registerLoadBalancerType, +} from './load-balancer'; +import { SubchannelAddress } from './subchannel'; +import { + LoadBalancingConfig, + isPriorityLoadBalancingConfig, +} from './load-balancing-config'; +import { ConnectivityState } from './channel'; +import { Picker, QueuePicker, UnavailablePicker } from './picker'; +import { ChildLoadBalancerHandler } from './load-balancer-child-handler'; +import { ChannelOptions } from './channel-options'; +import { Status } from './constants'; +import { Metadata } from './metadata'; + +const TYPE_NAME = 'priority'; + +const DEFAULT_FAILOVER_TIME_MS = 10_000; +const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000; + +export type LocalitySubchannelAddress = SubchannelAddress & { + localityPath: string[]; +}; + +export function isLocalitySubchannelAddress( + address: SubchannelAddress +): address is LocalitySubchannelAddress { + return Array.isArray((address as LocalitySubchannelAddress).localityPath); +} + +interface PriorityChildBalancer { + updateAddressList( + addressList: SubchannelAddress[], + lbConfig: LoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void; + exitIdle(): void; + resetBackoff(): void; + deactivate(): void; + maybeReactivate(): void; + cancelFailoverTimer(): void; + isFailoverTimerPending(): boolean; + getConnectivityState(): ConnectivityState; + getPicker(): Picker; + getName(): string; + destroy(): void; +} + +interface UpdateArgs { + subchannelAddress: SubchannelAddress[]; + lbConfig: LoadBalancingConfig; +} + +export class PriorityLoadBalancer implements LoadBalancer { + /** + * Inner class for holding a child priority and managing associated timers. + */ + private PriorityChildImpl = class implements PriorityChildBalancer { + private connectivityState: ConnectivityState = ConnectivityState.IDLE; + private picker: Picker; + private childBalancer: ChildLoadBalancerHandler; + private failoverTimer: NodeJS.Timer | null = null; + private deactivationTimer: NodeJS.Timer | null = null; + constructor(private parent: PriorityLoadBalancer, private name: string) { + this.childBalancer = new ChildLoadBalancerHandler({ + createSubchannel: ( + subchannelAddress: SubchannelAddress, + subchannelArgs: ChannelOptions + ) => { + return this.parent.channelControlHelper.createSubchannel( + subchannelAddress, + subchannelArgs + ); + }, + updateState: (connectivityState: ConnectivityState, picker: Picker) => { + this.updateState(connectivityState, picker); + }, + requestReresolution: () => { + this.parent.channelControlHelper.requestReresolution(); + }, + }); + this.picker = new QueuePicker(this.childBalancer); + } + + private updateState(connectivityState: ConnectivityState, picker: Picker) { + this.connectivityState = connectivityState; + this.picker = picker; + this.parent.onChildStateChange(this); + } + + private startFailoverTimer() { + if (this.failoverTimer === null) { + this.failoverTimer = setTimeout(() => { + this.failoverTimer = null; + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker() + ); + }, DEFAULT_FAILOVER_TIME_MS); + } + } + + updateAddressList( + addressList: SubchannelAddress[], + lbConfig: LoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void { + this.childBalancer.updateAddressList(addressList, lbConfig, attributes); + this.startFailoverTimer(); + } + + exitIdle() { + if (this.connectivityState === ConnectivityState.IDLE) { + this.startFailoverTimer(); + } + this.childBalancer.exitIdle(); + } + + resetBackoff() { + this.childBalancer.resetBackoff(); + } + + deactivate() { + if (this.deactivationTimer === null) { + this.deactivationTimer = setTimeout(() => { + this.parent.deleteChild(this); + this.childBalancer.destroy(); + }, DEFAULT_RETENTION_INTERVAL_MS); + } + } + + maybeReactivate() { + if (this.deactivationTimer !== null) { + clearTimeout(this.deactivationTimer); + this.deactivationTimer = null; + } + } + + cancelFailoverTimer() { + if (this.failoverTimer !== null) { + clearTimeout(this.failoverTimer); + this.failoverTimer = null; + } + } + + isFailoverTimerPending() { + return this.failoverTimer !== null; + } + + getConnectivityState() { + return this.connectivityState; + } + + getPicker() { + return this.picker; + } + + getName() { + return this.name; + } + + destroy() { + this.childBalancer.destroy(); + } + }; + // End of inner class PriorityChildImpl + + private children: Map = new Map< + string, + PriorityChildBalancer + >(); + /** + * The priority order of child names from the latest config update. + */ + private priorities: string[] = []; + /** + * The attributes object from the latest update, saved to be passed along to + * each new child as they are created + */ + private latestAttributes: { [key: string]: unknown } = {}; + /** + * The latest load balancing policies and address lists for each child from + * the latest update + */ + private latestUpdates: Map = new Map< + string, + UpdateArgs + >(); + /** + * Current chosen priority that requests are sent to + */ + private currentPriority: number | null = null; + /** + * After an update, this preserves the currently selected child from before + * the update. We continue to use that child until it disconnects, or + * another higher-priority child connects, or it is deleted because it is not + * in the new priority list at all and its retention interval has expired, or + * we try and fail to connect to every child in the new priority list. + */ + private currentChildFromBeforeUpdate: PriorityChildBalancer | null = null; + + constructor(private channelControlHelper: ChannelControlHelper) {} + + private updateState(state: ConnectivityState, picker: Picker) { + /* If switching to IDLE, use a QueuePicker attached to this load balancer + * so that when the picker calls exitIdle, that in turn calls exitIdle on + * the PriorityChildImpl, which will start the failover timer. */ + if (state === ConnectivityState.IDLE) { + picker = new QueuePicker(this); + } + this.channelControlHelper.updateState(state, picker); + } + + private onChildStateChange(child: PriorityChildBalancer) { + const childState = child.getConnectivityState(); + if (child === this.currentChildFromBeforeUpdate) { + if ( + childState === ConnectivityState.READY || + childState === ConnectivityState.IDLE + ) { + this.updateState(childState, child.getPicker()); + } else { + this.currentChildFromBeforeUpdate = null; + this.tryNextPriority(true); + } + return; + } + const childPriority = this.priorities.indexOf(child.getName()); + if (childPriority < 0) { + // child is not in the priority list, ignore updates + return; + } + if (this.currentPriority !== null && childPriority > this.currentPriority) { + // child is lower priority than the currently selected child, ignore updates + return; + } + if (childState === ConnectivityState.TRANSIENT_FAILURE) { + /* Report connecting if and only if the currently selected child is the + * one entering TRANSIENT_FAILURE */ + this.tryNextPriority(childPriority === this.currentPriority); + return; + } + if (this.currentPriority === null || childPriority < this.currentPriority) { + /* In this case, either there is no currently selected child or this + * child is higher priority than the currently selected child, so we want + * to switch to it if it is READY or IDLE. */ + if ( + childState === ConnectivityState.READY || + childState === ConnectivityState.IDLE + ) { + this.selectPriority(childPriority); + } + return; + } + /* The currently selected child has updated state to something other than + * TRANSIENT_FAILURE, so we pass that update along */ + this.updateState(childState, child.getPicker()); + } + + private deleteChild(child: PriorityChildBalancer) { + if (child === this.currentChildFromBeforeUpdate) { + this.currentChildFromBeforeUpdate = null; + /* If we get to this point, the currentChildFromBeforeUpdate was still in + * use, so we are still trying to connect to the specified priorities */ + this.tryNextPriority(true); + } + } + + /** + * Select the child at the specified priority, and report that child's state + * as this balancer's state until that child disconnects or a higher-priority + * child connects. + * @param priority + */ + private selectPriority(priority: number) { + this.currentPriority = priority; + const chosenChild = this.children.get(this.priorities[priority])!; + this.updateState( + chosenChild.getConnectivityState(), + chosenChild.getPicker() + ); + this.currentChildFromBeforeUpdate = null; + // Deactivate each child of lower priority than the chosen child + for (let i = priority + 1; i < this.priorities.length; i++) { + this.children.get(this.priorities[i])?.deactivate(); + } + } + + /** + * Check each child in priority order until we find one to use + * @param reportConnecting Whether we should report a CONNECTING state if we + * stop before picking a specific child. This should be true when we have + * not already selected a child. + */ + private tryNextPriority(reportConnecting: boolean) { + for (const [index, childName] of this.priorities.entries()) { + let child = this.children.get(childName); + /* If the child doesn't already exist, create it and update it. */ + if (child === undefined) { + if (reportConnecting) { + this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); + } + child = new this.PriorityChildImpl(this, childName); + this.children.set(childName, child); + const childUpdate = this.latestUpdates.get(childName); + if (childUpdate !== undefined) { + child.updateAddressList( + childUpdate.subchannelAddress, + childUpdate.lbConfig, + this.latestAttributes + ); + } + } + /* We're going to try to use this child, so reactivate it if it has been + * deactivated */ + child.maybeReactivate(); + if ( + child.getConnectivityState() === ConnectivityState.READY || + child.getConnectivityState() === ConnectivityState.IDLE + ) { + this.selectPriority(index); + return; + } + if (child.isFailoverTimerPending()) { + /* This child is still trying to connect. Wait until its failover timer + * has ended to continue to the next one */ + if (reportConnecting) { + this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); + } + return; + } + } + this.currentPriority = null; + this.currentChildFromBeforeUpdate = null; + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker({ + code: Status.UNAVAILABLE, + details: 'No ready priority', + metadata: new Metadata(), + }) + ); + } + + updateAddressList( + addressList: SubchannelAddress[], + lbConfig: LoadBalancingConfig, + attributes: { [key: string]: unknown } + ): void { + if (!isPriorityLoadBalancingConfig(lbConfig)) { + // Reject a config of the wrong type + return; + } + const priorityConfig = lbConfig.priority; + /* For each address, the first element of its localityPath array determines + * which child it belongs to. So we bucket those addresses by that first + * element, and pass along the rest of the localityPath for that child + * to use. */ + const childAddressMap: Map = new Map< + string, + LocalitySubchannelAddress[] + >(); + for (const address of addressList) { + if (!isLocalitySubchannelAddress(address)) { + // Reject address that cannot be prioritized + return; + } + if (address.localityPath.length < 1) { + // Reject address that cannot be prioritized + return; + } + const childName = address.localityPath[0]; + const childAddress: LocalitySubchannelAddress = { + ...address, + localityPath: address.localityPath.slice(1), + }; + let childAddressList = childAddressMap.get(childName); + if (childAddressList === undefined) { + childAddressList = []; + childAddressMap.set(childName, childAddressList); + } + childAddressList.push(childAddress); + } + if (this.currentPriority !== null) { + this.currentChildFromBeforeUpdate = this.children.get( + this.priorities[this.currentPriority] + )!; + this.currentPriority = null; + } + this.latestAttributes = attributes; + this.latestUpdates.clear(); + this.priorities = priorityConfig.priorities; + /* Pair up the new child configs with the corresponding address lists, and + * update all existing children with their new configs */ + for (const [childName, childConfig] of priorityConfig.children) { + const chosenChildConfig = getFirstUsableConfig(childConfig.config); + if (chosenChildConfig !== null) { + const childAddresses = childAddressMap.get(childName) ?? []; + this.latestUpdates.set(childName, { + subchannelAddress: childAddresses, + lbConfig: chosenChildConfig, + }); + const existingChild = this.children.get(childName); + if (existingChild !== undefined) { + existingChild.updateAddressList( + childAddresses, + chosenChildConfig, + attributes + ); + } + } + } + // Deactivate all children that are no longer in the priority list + for (const [childName, child] of this.children) { + if (this.priorities.indexOf(childName) < 0) { + child.deactivate(); + } + } + // Only report connecting if there are no existing children + this.tryNextPriority(this.children.size === 0); + } + exitIdle(): void { + if (this.currentPriority !== null) { + this.children.get(this.priorities[this.currentPriority])?.exitIdle(); + } + } + resetBackoff(): void { + for (const child of this.children.values()) { + child.resetBackoff(); + } + } + destroy(): void { + for (const child of this.children.values()) { + child.destroy(); + } + this.children.clear(); + this.currentChildFromBeforeUpdate?.destroy(); + this.currentChildFromBeforeUpdate = null; + } + getTypeName(): string { + return TYPE_NAME; + } +} + +export function setup() { + registerLoadBalancerType(TYPE_NAME, PriorityLoadBalancer); +} diff --git a/packages/grpc-js/src/load-balancer-weighted-target.ts b/packages/grpc-js/src/load-balancer-weighted-target.ts new file mode 100644 index 00000000..04145914 --- /dev/null +++ b/packages/grpc-js/src/load-balancer-weighted-target.ts @@ -0,0 +1,322 @@ +/* + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { LoadBalancer, ChannelControlHelper, getFirstUsableConfig, registerLoadBalancerType } from "./load-balancer"; +import { SubchannelAddress } from "./subchannel"; +import { LoadBalancingConfig, WeightedTarget, isWeightedTargetLoadBalancingConfig } from "./load-balancing-config"; +import { Picker, PickResult, PickArgs, QueuePicker, UnavailablePicker } from "./picker"; +import { ConnectivityState } from "./channel"; +import { ChildLoadBalancerHandler } from "./load-balancer-child-handler"; +import { Status } from "./constants"; +import { Metadata } from "./metadata"; +import { isLocalitySubchannelAddress, LocalitySubchannelAddress } from "./load-balancer-priority"; + +const TYPE_NAME = 'weighted_target'; + +const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000; + +/** + * Represents a picker and a subinterval of a larger interval used for randomly + * selecting an element of a list of these objects. + */ +interface WeightedPicker { + picker: Picker; + /** + * The exclusive end of the interval associated with this element. The start + * of the interval is implicitly the rangeEnd of the previous element in the + * list, or 0 for the first element in the list. + */ + rangeEnd: number; +} + +class WeightedTargetPicker implements Picker { + private rangeTotal: number; + constructor(private readonly pickerList: WeightedPicker[]) { + this.rangeTotal = pickerList[pickerList.length - 1].rangeEnd; + } + pick(pickArgs: PickArgs): PickResult { + // num | 0 is equivalent to floor(num) + const selection = (Math.random() * this.rangeTotal) | 0; + + /* Binary search for the element of the list such that + * pickerList[index - 1].rangeEnd <= selection < pickerList[index].rangeEnd + */ + let mid = 0; + let startIndex = 0; + let endIndex = this.pickerList.length - 1; + let index = 0; + while (endIndex > startIndex) { + mid = ((startIndex + endIndex) / 2) | 0; + if (this.pickerList[mid].rangeEnd > selection) { + endIndex = mid; + } else if (this.pickerList[mid].rangeEnd < selection) { + startIndex = mid + 1; + } else { + // + 1 here because the range is exclusive at the top end + index = mid + 1; + break; + } + } + if (index === 0) { + index = startIndex; + } + + return this.pickerList[index].picker.pick(pickArgs); + } +} + +interface WeightedChild { + updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void; + exitIdle(): void; + resetBackoff(): void; + destroy(): void; + deactivate(): void; + maybeReactivate(): void; + getConnectivityState(): ConnectivityState; + getPicker(): Picker; + getWeight(): number; +} + +export class WeightedTargetLoadBalancer implements LoadBalancer { + private WeightedChildImpl = class implements WeightedChild { + private connectivityState: ConnectivityState = ConnectivityState.IDLE; + private picker: Picker; + private childBalancer: ChildLoadBalancerHandler; + private deactivationTimer: NodeJS.Timer | null = null; + private weight: number = 0; + + constructor(private parent: WeightedTargetLoadBalancer, private name: string) { + this.childBalancer = new ChildLoadBalancerHandler({ + createSubchannel: (subchannelAddress, subchannelOptions) => { + return this.parent.channelControlHelper.createSubchannel(subchannelAddress, subchannelOptions); + }, + updateState: (connectivityState, picker) => { + this.updateState(connectivityState, picker); + }, + requestReresolution: () => { + this.parent.channelControlHelper.requestReresolution(); + } + }); + + this.picker = new QueuePicker(this.childBalancer); + } + + private updateState(connectivityState: ConnectivityState, picker: Picker) { + this.connectivityState = connectivityState; + this.picker = picker; + this.parent.updateState(); + } + + updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void { + this.weight = lbConfig.weight; + const childConfig = getFirstUsableConfig(lbConfig.child_policy); + if (childConfig !== null) { + this.childBalancer.updateAddressList(addressList, childConfig, attributes); + } + } + exitIdle(): void { + this.childBalancer.exitIdle(); + } + resetBackoff(): void { + this.childBalancer.resetBackoff(); + } + destroy(): void { + this.childBalancer.destroy(); + if (this.deactivationTimer !== null) { + clearTimeout(this.deactivationTimer); + } + } + deactivate(): void { + if (this.deactivationTimer === null) { + this.deactivationTimer = setTimeout(() => { + this.parent.targets.delete(this.name); + this.deactivationTimer = null; + }, DEFAULT_RETENTION_INTERVAL_MS); + } + } + maybeReactivate(): void { + if (this.deactivationTimer !== null) { + clearTimeout(this.deactivationTimer); + this.deactivationTimer = null; + } + } + getConnectivityState(): ConnectivityState { + return this.connectivityState; + } + getPicker(): Picker { + return this.picker; + } + getWeight(): number { + return this.weight; + } + } + // end of WeightedChildImpl + + /** + * Map of target names to target children. Includes current targets and + * previous targets with deactivation timers that have not yet triggered. + */ + private targets: Map = new Map(); + /** + * List of current target names. + */ + private targetList: string[] = []; + + constructor(private channelControlHelper: ChannelControlHelper) {} + + private updateState() { + const pickerList: WeightedPicker[] = []; + let end = 0; + + let connectingCount = 0; + let idleCount = 0; + let transientFailureCount = 0; + for (const targetName of this.targetList) { + const target = this.targets.get(targetName); + if (target === undefined) { + continue; + } + switch (target.getConnectivityState()) { + case ConnectivityState.READY: + end += target.getWeight(); + pickerList.push({ + picker: target.getPicker(), + rangeEnd: end + }); + break; + case ConnectivityState.CONNECTING: + connectingCount += 1; + break; + case ConnectivityState.IDLE: + idleCount += 1; + break; + case ConnectivityState.TRANSIENT_FAILURE: + transientFailureCount += 1; + break; + default: + // Ignore the other possiblity, SHUTDOWN + } + } + + let connectivityState: ConnectivityState; + if (pickerList.length > 0) { + connectivityState = ConnectivityState.READY; + } else if (connectingCount > 0) { + connectivityState = ConnectivityState.CONNECTING; + } else if (idleCount > 0) { + connectivityState = ConnectivityState.IDLE; + } else { + connectivityState = ConnectivityState.TRANSIENT_FAILURE; + } + + let picker: Picker; + switch (connectivityState) { + case ConnectivityState.READY: + picker = new WeightedTargetPicker(pickerList); + break; + case ConnectivityState.CONNECTING: + case ConnectivityState.READY: + picker = new QueuePicker(this); + break; + default: + picker = new UnavailablePicker({ + code: Status.UNAVAILABLE, + details: 'weighted_target: all children report state TRANSIENT_FAILURE', + metadata: new Metadata() + }); + } + this.channelControlHelper.updateState(connectivityState, picker); + } + + updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void { + if (!isWeightedTargetLoadBalancingConfig(lbConfig)) { + // Reject a config of the wrong type + return; + } + + /* For each address, the first element of its localityPath array determines + * which child it belongs to. So we bucket those addresses by that first + * element, and pass along the rest of the localityPath for that child + * to use. */ + const childAddressMap = new Map(); + for (const address of addressList) { + if (!isLocalitySubchannelAddress(address)) { + // Reject address that cannot be associated with targets + return; + } + if (address.localityPath.length < 1) { + // Reject address that cannot be associated with targets + return; + } + const childName = address.localityPath[0]; + const childAddress: LocalitySubchannelAddress = { + ...address, + localityPath: address.localityPath.slice(1), + }; + let childAddressList = childAddressMap.get(childName); + if (childAddressList === undefined) { + childAddressList = []; + childAddressMap.set(childName, childAddressList); + } + childAddressList.push(childAddress); + } + + this.targetList = Array.from(lbConfig.weighted_target.targets.keys()); + for (const [targetName, targetConfig] of lbConfig.weighted_target.targets) { + let target = this.targets.get(targetName); + if (target === undefined) { + target = new this.WeightedChildImpl(this, targetName); + this.targets.set(targetName, target); + } else { + target.maybeReactivate(); + } + target.updateAddressList(childAddressMap.get(targetName) ?? [], targetConfig, attributes); + } + + // Deactivate targets that are not in the new config + for (const [targetName, target] of this.targets) { + if (this.targetList.indexOf(targetName) < 0) { + target.deactivate(); + } + } + + this.updateState(); + } + exitIdle(): void { + for (const targetName of this.targetList) { + this.targets.get(targetName)?.exitIdle(); + } + } + resetBackoff(): void { + for (const targetName of this.targetList) { + this.targets.get(targetName)?.resetBackoff(); + } + } + destroy(): void { + for (const target of this.targets.values()) { + target.destroy(); + } + this.targets.clear(); + } + getTypeName(): string { + return TYPE_NAME; + } +} + +export function setup() { + registerLoadBalancerType(TYPE_NAME, WeightedTargetLoadBalancer); +} \ No newline at end of file diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index dc4cf4e3..9a1c1fcd 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -22,6 +22,8 @@ import { Picker } from './picker'; import { LoadBalancingConfig } from './load-balancing-config'; import * as load_balancer_pick_first from './load-balancer-pick-first'; import * as load_balancer_round_robin from './load-balancer-round-robin'; +import * as load_balancer_priority from './load-balancer-priority'; +import * as load_balancer_weighted_target from './load-balancer-weighted-target'; /** * A collection of functions associated with a channel that a load balancer @@ -137,4 +139,6 @@ export function getFirstUsableConfig( export function registerAll() { load_balancer_pick_first.setup(); load_balancer_round_robin.setup(); + load_balancer_priority.setup(); + load_balancer_weighted_target.setup(); } diff --git a/packages/grpc-js/src/load-balancing-config.ts b/packages/grpc-js/src/load-balancing-config.ts index 0a06d5f8..736e3d42 100644 --- a/packages/grpc-js/src/load-balancing-config.ts +++ b/packages/grpc-js/src/load-balancing-config.ts @@ -48,6 +48,15 @@ export interface PriorityLbConfig { priorities: string[]; } +export interface WeightedTarget { + weight: number; + child_policy: LoadBalancingConfig[]; +} + +export interface WeightedTargetLbConfig { + targets: Map; +} + export interface PickFirstLoadBalancingConfig { name: 'pick_first'; pick_first: PickFirstConfig; @@ -73,12 +82,18 @@ export interface PriorityLoadBalancingConfig { priority: PriorityLbConfig; } +export interface WeightedTargetLoadBalancingConfig { + name: 'weighted_target'; + weighted_target: WeightedTargetLbConfig; +} + export type LoadBalancingConfig = | PickFirstLoadBalancingConfig | RoundRobinLoadBalancingConfig | XdsLoadBalancingConfig | GrpcLbLoadBalancingConfig - | PriorityLoadBalancingConfig; + | PriorityLoadBalancingConfig + | WeightedTargetLoadBalancingConfig; export function isRoundRobinLoadBalancingConfig( lbconfig: LoadBalancingConfig @@ -104,6 +119,12 @@ export function isPriorityLoadBalancingConfig( return lbconfig.name === 'priority'; } +export function isWeightedTargetLoadBalancingConfig( + lbconfig: LoadBalancingConfig +): lbconfig is WeightedTargetLoadBalancingConfig { + return lbconfig.name === 'weighted_target'; +} + /* In these functions we assume the input came from a JSON object. Therefore we * expect that the prototype is uninteresting and that `in` can be used * effectively */ diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index 016f3bd1..8a7f4f22 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -269,12 +269,7 @@ class DnsResolver implements Resolver { * @param target */ static getDefaultAuthority(target: GrpcUri): string { - const hostPort = splitHostPort(target.path); - if (hostPort !== null) { - return hostPort.host; - } else { - throw new Error(`Failed to parse target ${uriToString(target)}`); - } + return target.path; } } diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index a6edee3b..ebd4504d 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -500,10 +500,11 @@ export class Server { } } - // If any sessions are active, close them gracefully. - pendingChecks += this.sessions.size; this.sessions.forEach((session) => { - session.close(maybeCallback); + if (!session.closed) { + pendingChecks += 1; + session.close(maybeCallback); + } }); if (pendingChecks === 0) { callback(); @@ -608,6 +609,10 @@ export class Server { } this.sessions.add(session); + + session.on('close', () => { + this.sessions.delete(session); + }); }); } } diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 1d48604a..be72680e 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -28,7 +28,7 @@ import * as logging from './logging'; import { LogVerbosity } from './constants'; import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; import * as net from 'net'; -import { GrpcUri } from './uri-parser'; +import { GrpcUri, parseUri, splitHostPort } from './uri-parser'; import { ConnectionOptions } from 'tls'; import { FilterFactory, Filter } from './filter'; @@ -286,6 +286,9 @@ export class Subchannel { } private createSession(proxyConnectionResult: ProxyConnectionResult) { + const targetAuthority = getDefaultAuthority( + proxyConnectionResult.realTarget ?? this.channelTarget + ); let connectionOptions: http2.SecureClientSessionOptions = this.credentials._getConnectionOptions() || {}; let addressScheme = 'http://'; @@ -305,8 +308,18 @@ export class Subchannel { return checkServerIdentity(sslTargetNameOverride, cert); }; connectionOptions.servername = sslTargetNameOverride; + } else { + const authorityHostname = + splitHostPort(targetAuthority)?.host ?? 'localhost'; + // We want to always set servername to support SNI + connectionOptions.servername = authorityHostname; } if (proxyConnectionResult.socket) { + /* This is part of the workaround for + * https://github.com/nodejs/node/issues/32922. Without that bug, + * proxyConnectionResult.socket would always be a plaintext socket and + * this would say + * connectionOptions.socket = proxyConnectionResult.socket; */ connectionOptions.createConnection = (authority, option) => { return proxyConnectionResult.socket!; }; @@ -350,10 +363,7 @@ export class Subchannel { * determines whether the connection will be established over TLS or not. */ const session = http2.connect( - addressScheme + - getDefaultAuthority( - proxyConnectionResult.realTarget ?? this.channelTarget - ), + addressScheme + targetAuthority, connectionOptions ); this.session = session; @@ -404,6 +414,11 @@ export class Subchannel { KEEPALIVE_MAX_TIME_MS ); } + trace( + this.subchannelAddress + + ' connection closed by GOAWAY with code ' + + errorCode + ); this.transitionToState( [ConnectivityState.CONNECTING, ConnectivityState.READY], ConnectivityState.IDLE @@ -446,6 +461,18 @@ export class Subchannel { return checkServerIdentity(sslTargetNameOverride, cert); }; connectionOptions.servername = sslTargetNameOverride; + } else { + if ('grpc.http_connect_target' in this.options) { + /* This is more or less how servername will be set in createSession + * if a connection is successfully established through the proxy. + * If the proxy is not used, these connectionOptions are discarded + * anyway */ + connectionOptions.servername = getDefaultAuthority( + parseUri(this.options['grpc.http_connect_target'] as string) ?? { + path: 'localhost', + } + ); + } } } @@ -640,7 +667,24 @@ export class Subchannel { headers[HTTP2_HEADER_METHOD] = 'POST'; headers[HTTP2_HEADER_PATH] = callStream.getMethod(); headers[HTTP2_HEADER_TE] = 'trailers'; - const http2Stream = this.session!.request(headers); + let http2Stream: http2.ClientHttp2Stream; + /* In theory, if an error is thrown by session.request because session has + * become unusable (e.g. because it has received a goaway), this subchannel + * should soon see the corresponding close or goaway event anyway and leave + * READY. But we have seen reports that this does not happen + * (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096) + * so for defense in depth, we just discard the session when we see an + * error here. + */ + try { + http2Stream = this.session!.request(headers); + } catch (e) { + this.transitionToState( + [ConnectivityState.READY], + ConnectivityState.TRANSIENT_FAILURE + ); + throw e; + } let headersString = ''; for (const header of Object.keys(headers)) { headersString += '\t\t' + header + ': ' + headers[header] + '\n'; diff --git a/packages/grpc-tools/package.json b/packages/grpc-tools/package.json index c7bb2794..67f6af4d 100644 --- a/packages/grpc-tools/package.json +++ b/packages/grpc-tools/package.json @@ -24,7 +24,7 @@ "prepublishOnly": "git submodule update --init --recursive && node copy_well_known_protos.js" }, "dependencies": { - "node-pre-gyp": "^0.12.0" + "node-pre-gyp": "^0.15.0" }, "binary": { "module_name": "grpc_tools", diff --git a/packages/proto-loader/package.json b/packages/proto-loader/package.json index 022eb41c..cba96b43 100644 --- a/packages/proto-loader/package.json +++ b/packages/proto-loader/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/proto-loader", - "version": "0.5.4", + "version": "0.5.5", "author": "Google Inc.", "contributors": [ { diff --git a/packages/proto-loader/src/index.ts b/packages/proto-loader/src/index.ts index b29e26f3..ffef89d3 100644 --- a/packages/proto-loader/src/index.ts +++ b/packages/proto-loader/src/index.ts @@ -369,23 +369,29 @@ export function loadSync( } // Load Google's well-known proto files that aren't exposed by Protobuf.js. -{ - // Protobuf.js exposes: any, duration, empty, field_mask, struct, timestamp, - // and wrappers. compiler/plugin is excluded in Protobuf.js and here. - const wellKnownProtos = ['api', 'descriptor', 'source_context', 'type']; - const sourceDir = path.join( - path.dirname(require.resolve('protobufjs')), - 'google', - 'protobuf' - ); - for (const proto of wellKnownProtos) { - const file = path.join(sourceDir, `${proto}.proto`); - const descriptor = Protobuf.loadSync(file).toJSON(); +// Protobuf.js exposes: any, duration, empty, field_mask, struct, timestamp, +// and wrappers. compiler/plugin is excluded in Protobuf.js and here. - Protobuf.common( - proto, - (descriptor.nested!.google as Protobuf.INamespace).nested! - ); - } -} +// Using constant strings for compatibility with tools like Webpack +const apiDescriptor = require('protobufjs/google/protobuf/api.json'); +const descriptorDescriptor = require('protobufjs/google/protobuf/descriptor.json'); +const sourceContextDescriptor = require('protobufjs/google/protobuf/source_context.json'); +const typeDescriptor = require('protobufjs/google/protobuf/type.json'); + +Protobuf.common( + 'api', + apiDescriptor.nested.google.nested.protobuf.nested +); +Protobuf.common( + 'descriptor', + descriptorDescriptor.nested.google.nested.protobuf.nested +); +Protobuf.common( + 'source_context', + sourceContextDescriptor.nested.google.nested.protobuf.nested +); +Protobuf.common( + 'type', + typeDescriptor.nested.google.nested.protobuf.nested +); diff --git a/packages/proto-loader/test_protos/well_known.proto b/packages/proto-loader/test_protos/well_known.proto index dd70402b..3ff2292f 100644 --- a/packages/proto-loader/test_protos/well_known.proto +++ b/packages/proto-loader/test_protos/well_known.proto @@ -19,3 +19,7 @@ import "google/protobuf/descriptor.proto"; extend google.protobuf.FieldOptions { bool redact = 52000; } + +message DescriptorHolder { + google.protobuf.DescriptorProto descriptor = 1; +} diff --git a/test/api/interop_extra_test.js b/test/api/interop_extra_test.js index b798f1a6..67130e03 100644 --- a/test/api/interop_extra_test.js +++ b/test/api/interop_extra_test.js @@ -144,6 +144,30 @@ describe(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, functio done(); }); }); + it('should receive all messages in a long stream', function(done) { + this.timeout(20000); + var arg = { + response_type: 'COMPRESSABLE', + response_parameters: [ + ] + }; + for (let i = 0; i < 100000; i++) { + arg.response_parameters.push({size: 0}); + } + var call = client.streamingOutputCall(arg); + let responseCount = 0; + call.on('data', (value) => { + responseCount++; + }); + call.on('status', (status) => { + assert.strictEqual(status.code, grpc.status.OK); + assert.strictEqual(responseCount, arg.response_parameters.length); + done(); + }); + call.on('error', (error) => { + assert.ifError(error); + }); + }); describe('max message size', function() { // A size that is larger than the default limit const largeMessageSize = 8 * 1024 * 1024; diff --git a/test/interop/async_delay_queue.js b/test/interop/async_delay_queue.js index 43ac5738..f9a39b59 100644 --- a/test/interop/async_delay_queue.js +++ b/test/interop/async_delay_queue.js @@ -39,9 +39,13 @@ AsyncDelayQueue.prototype.runNext = function() { var continueCallback = _.bind(this.runNext, this); if (next) { this.callback_pending = true; - setTimeout(function() { + if (next.delay === 0) { next.callback(continueCallback); - }, next.delay); + } else { + setTimeout(function() { + next.callback(continueCallback); + }, next.delay); + } } else { this.callback_pending = false; }