Merge branch 'master' into grpc-js_eds_lb_policy

This commit is contained in:
Michael Lumish 2020-07-16 10:02:18 -07:00
commit 31b297992b
19 changed files with 964 additions and 51 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.0.5",
"version": "1.1.2",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
@ -60,9 +60,6 @@
"dependencies": {
"semver": "^6.2.0"
},
"peerDependencies": {
"google-auth-library": "5.x || 6.x"
},
"files": [
"src/**/*.ts",
"build/src/*.{js,d.ts,js.map}",

View File

@ -20,6 +20,7 @@ import { Channel } from './channel';
import { BaseFilter, Filter, FilterFactory } from './filter';
import { Metadata } from './metadata';
import { Status } from './constants';
import { splitHostPort } from './uri-parser';
export class CallCredentialsFilter extends BaseFilter implements Filter {
private serviceUrl: string;
@ -38,9 +39,10 @@ export class CallCredentialsFilter extends BaseFilter implements Filter {
if (splitPath.length >= 2) {
serviceName = splitPath[1];
}
const hostname = splitHostPort(stream.getHost())?.host ?? 'localhost';
/* Currently, call credentials are only allowed on HTTPS connections, so we
* can assume that the scheme is "https" */
this.serviceUrl = `https://${stream.getHost()}/${serviceName}`;
this.serviceUrl = `https://${hostname}/${serviceName}`;
}
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {

View File

@ -541,6 +541,15 @@ export class Http2CallStream implements Call {
code = Status.PERMISSION_DENIED;
details = 'Protocol not secure enough';
break;
case http2.constants.NGHTTP2_INTERNAL_ERROR:
code = Status.INTERNAL;
/* This error code was previously handled in the default case, and
* there are several instances of it online, so I wanted to
* preserve the original error message so that people find existing
* information in searches, but also include the more recognizable
* "Internal server error" message. */
details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
break;
default:
code = Status.INTERNAL;
details = `Received RST_STREAM with code ${stream.rstCode}`;

View File

@ -558,9 +558,7 @@ export class Client {
},
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onReceiveMessage(message: any) {
if (stream.push(message)) {
call.startRead();
}
stream.push(message);
},
onReceiveStatus(status: StatusObject) {
if (receivedStatus) {
@ -656,9 +654,7 @@ export class Client {
stream.emit('metadata', metadata);
},
onReceiveMessage(message: Buffer) {
if (stream.push(message)) {
call.startRead();
}
stream.push(message)
},
onReceiveStatus(status: StatusObject) {
if (receivedStatus) {

View File

@ -41,8 +41,20 @@ export enum LogVerbosity {
ERROR,
}
/**
* NOTE: This enum is not currently used in any implemented API in this
* library. It is included only for type parity with the other implementation.
*/
export enum Propagate {
DEADLINE = 1,
CENSUS_STATS_CONTEXT = 2,
CENSUS_TRACING_CONTEXT = 4,
CANCELLATION = 8,
DEFAULTS = 65536,
}
// -1 means unlimited
export const DEFAULT_MAX_SEND_MESSAGE_LENGTH = -1;
// 4 MB default
export const DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 4 * 1024 * 1024;
export const DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 4 * 1024 * 1024;

View File

@ -36,7 +36,7 @@ import {
CallProperties,
UnaryCallback,
} from './client';
import { LogVerbosity, Status } from './constants';
import { LogVerbosity, Status, Propagate } from './constants';
import * as logging from './logging';
import {
Deserialize,
@ -127,6 +127,7 @@ export {
LogVerbosity as logVerbosity,
Status as status,
ConnectivityState as connectivityState,
Propagate as propagate,
// TODO: Other constants as well
};

View File

@ -0,0 +1,467 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import {
LoadBalancer,
ChannelControlHelper,
getFirstUsableConfig,
registerLoadBalancerType,
} from './load-balancer';
import { SubchannelAddress } from './subchannel';
import {
LoadBalancingConfig,
isPriorityLoadBalancingConfig,
} from './load-balancing-config';
import { ConnectivityState } from './channel';
import { Picker, QueuePicker, UnavailablePicker } from './picker';
import { ChildLoadBalancerHandler } from './load-balancer-child-handler';
import { ChannelOptions } from './channel-options';
import { Status } from './constants';
import { Metadata } from './metadata';
const TYPE_NAME = 'priority';
const DEFAULT_FAILOVER_TIME_MS = 10_000;
const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000;
export type LocalitySubchannelAddress = SubchannelAddress & {
localityPath: string[];
};
export function isLocalitySubchannelAddress(
address: SubchannelAddress
): address is LocalitySubchannelAddress {
return Array.isArray((address as LocalitySubchannelAddress).localityPath);
}
interface PriorityChildBalancer {
updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
attributes: { [key: string]: unknown }
): void;
exitIdle(): void;
resetBackoff(): void;
deactivate(): void;
maybeReactivate(): void;
cancelFailoverTimer(): void;
isFailoverTimerPending(): boolean;
getConnectivityState(): ConnectivityState;
getPicker(): Picker;
getName(): string;
destroy(): void;
}
interface UpdateArgs {
subchannelAddress: SubchannelAddress[];
lbConfig: LoadBalancingConfig;
}
export class PriorityLoadBalancer implements LoadBalancer {
/**
* Inner class for holding a child priority and managing associated timers.
*/
private PriorityChildImpl = class implements PriorityChildBalancer {
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
private picker: Picker;
private childBalancer: ChildLoadBalancerHandler;
private failoverTimer: NodeJS.Timer | null = null;
private deactivationTimer: NodeJS.Timer | null = null;
constructor(private parent: PriorityLoadBalancer, private name: string) {
this.childBalancer = new ChildLoadBalancerHandler({
createSubchannel: (
subchannelAddress: SubchannelAddress,
subchannelArgs: ChannelOptions
) => {
return this.parent.channelControlHelper.createSubchannel(
subchannelAddress,
subchannelArgs
);
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.updateState(connectivityState, picker);
},
requestReresolution: () => {
this.parent.channelControlHelper.requestReresolution();
},
});
this.picker = new QueuePicker(this.childBalancer);
}
private updateState(connectivityState: ConnectivityState, picker: Picker) {
this.connectivityState = connectivityState;
this.picker = picker;
this.parent.onChildStateChange(this);
}
private startFailoverTimer() {
if (this.failoverTimer === null) {
this.failoverTimer = setTimeout(() => {
this.failoverTimer = null;
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker()
);
}, DEFAULT_FAILOVER_TIME_MS);
}
}
updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
this.childBalancer.updateAddressList(addressList, lbConfig, attributes);
this.startFailoverTimer();
}
exitIdle() {
if (this.connectivityState === ConnectivityState.IDLE) {
this.startFailoverTimer();
}
this.childBalancer.exitIdle();
}
resetBackoff() {
this.childBalancer.resetBackoff();
}
deactivate() {
if (this.deactivationTimer === null) {
this.deactivationTimer = setTimeout(() => {
this.parent.deleteChild(this);
this.childBalancer.destroy();
}, DEFAULT_RETENTION_INTERVAL_MS);
}
}
maybeReactivate() {
if (this.deactivationTimer !== null) {
clearTimeout(this.deactivationTimer);
this.deactivationTimer = null;
}
}
cancelFailoverTimer() {
if (this.failoverTimer !== null) {
clearTimeout(this.failoverTimer);
this.failoverTimer = null;
}
}
isFailoverTimerPending() {
return this.failoverTimer !== null;
}
getConnectivityState() {
return this.connectivityState;
}
getPicker() {
return this.picker;
}
getName() {
return this.name;
}
destroy() {
this.childBalancer.destroy();
}
};
// End of inner class PriorityChildImpl
private children: Map<string, PriorityChildBalancer> = new Map<
string,
PriorityChildBalancer
>();
/**
* The priority order of child names from the latest config update.
*/
private priorities: string[] = [];
/**
* The attributes object from the latest update, saved to be passed along to
* each new child as they are created
*/
private latestAttributes: { [key: string]: unknown } = {};
/**
* The latest load balancing policies and address lists for each child from
* the latest update
*/
private latestUpdates: Map<string, UpdateArgs> = new Map<
string,
UpdateArgs
>();
/**
* Current chosen priority that requests are sent to
*/
private currentPriority: number | null = null;
/**
* After an update, this preserves the currently selected child from before
* the update. We continue to use that child until it disconnects, or
* another higher-priority child connects, or it is deleted because it is not
* in the new priority list at all and its retention interval has expired, or
* we try and fail to connect to every child in the new priority list.
*/
private currentChildFromBeforeUpdate: PriorityChildBalancer | null = null;
constructor(private channelControlHelper: ChannelControlHelper) {}
private updateState(state: ConnectivityState, picker: Picker) {
/* If switching to IDLE, use a QueuePicker attached to this load balancer
* so that when the picker calls exitIdle, that in turn calls exitIdle on
* the PriorityChildImpl, which will start the failover timer. */
if (state === ConnectivityState.IDLE) {
picker = new QueuePicker(this);
}
this.channelControlHelper.updateState(state, picker);
}
private onChildStateChange(child: PriorityChildBalancer) {
const childState = child.getConnectivityState();
if (child === this.currentChildFromBeforeUpdate) {
if (
childState === ConnectivityState.READY ||
childState === ConnectivityState.IDLE
) {
this.updateState(childState, child.getPicker());
} else {
this.currentChildFromBeforeUpdate = null;
this.tryNextPriority(true);
}
return;
}
const childPriority = this.priorities.indexOf(child.getName());
if (childPriority < 0) {
// child is not in the priority list, ignore updates
return;
}
if (this.currentPriority !== null && childPriority > this.currentPriority) {
// child is lower priority than the currently selected child, ignore updates
return;
}
if (childState === ConnectivityState.TRANSIENT_FAILURE) {
/* Report connecting if and only if the currently selected child is the
* one entering TRANSIENT_FAILURE */
this.tryNextPriority(childPriority === this.currentPriority);
return;
}
if (this.currentPriority === null || childPriority < this.currentPriority) {
/* In this case, either there is no currently selected child or this
* child is higher priority than the currently selected child, so we want
* to switch to it if it is READY or IDLE. */
if (
childState === ConnectivityState.READY ||
childState === ConnectivityState.IDLE
) {
this.selectPriority(childPriority);
}
return;
}
/* The currently selected child has updated state to something other than
* TRANSIENT_FAILURE, so we pass that update along */
this.updateState(childState, child.getPicker());
}
private deleteChild(child: PriorityChildBalancer) {
if (child === this.currentChildFromBeforeUpdate) {
this.currentChildFromBeforeUpdate = null;
/* If we get to this point, the currentChildFromBeforeUpdate was still in
* use, so we are still trying to connect to the specified priorities */
this.tryNextPriority(true);
}
}
/**
* Select the child at the specified priority, and report that child's state
* as this balancer's state until that child disconnects or a higher-priority
* child connects.
* @param priority
*/
private selectPriority(priority: number) {
this.currentPriority = priority;
const chosenChild = this.children.get(this.priorities[priority])!;
this.updateState(
chosenChild.getConnectivityState(),
chosenChild.getPicker()
);
this.currentChildFromBeforeUpdate = null;
// Deactivate each child of lower priority than the chosen child
for (let i = priority + 1; i < this.priorities.length; i++) {
this.children.get(this.priorities[i])?.deactivate();
}
}
/**
* Check each child in priority order until we find one to use
* @param reportConnecting Whether we should report a CONNECTING state if we
* stop before picking a specific child. This should be true when we have
* not already selected a child.
*/
private tryNextPriority(reportConnecting: boolean) {
for (const [index, childName] of this.priorities.entries()) {
let child = this.children.get(childName);
/* If the child doesn't already exist, create it and update it. */
if (child === undefined) {
if (reportConnecting) {
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
}
child = new this.PriorityChildImpl(this, childName);
this.children.set(childName, child);
const childUpdate = this.latestUpdates.get(childName);
if (childUpdate !== undefined) {
child.updateAddressList(
childUpdate.subchannelAddress,
childUpdate.lbConfig,
this.latestAttributes
);
}
}
/* We're going to try to use this child, so reactivate it if it has been
* deactivated */
child.maybeReactivate();
if (
child.getConnectivityState() === ConnectivityState.READY ||
child.getConnectivityState() === ConnectivityState.IDLE
) {
this.selectPriority(index);
return;
}
if (child.isFailoverTimerPending()) {
/* This child is still trying to connect. Wait until its failover timer
* has ended to continue to the next one */
if (reportConnecting) {
this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
}
return;
}
}
this.currentPriority = null;
this.currentChildFromBeforeUpdate = null;
this.updateState(
ConnectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({
code: Status.UNAVAILABLE,
details: 'No ready priority',
metadata: new Metadata(),
})
);
}
updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
if (!isPriorityLoadBalancingConfig(lbConfig)) {
// Reject a config of the wrong type
return;
}
const priorityConfig = lbConfig.priority;
/* For each address, the first element of its localityPath array determines
* which child it belongs to. So we bucket those addresses by that first
* element, and pass along the rest of the localityPath for that child
* to use. */
const childAddressMap: Map<string, LocalitySubchannelAddress[]> = new Map<
string,
LocalitySubchannelAddress[]
>();
for (const address of addressList) {
if (!isLocalitySubchannelAddress(address)) {
// Reject address that cannot be prioritized
return;
}
if (address.localityPath.length < 1) {
// Reject address that cannot be prioritized
return;
}
const childName = address.localityPath[0];
const childAddress: LocalitySubchannelAddress = {
...address,
localityPath: address.localityPath.slice(1),
};
let childAddressList = childAddressMap.get(childName);
if (childAddressList === undefined) {
childAddressList = [];
childAddressMap.set(childName, childAddressList);
}
childAddressList.push(childAddress);
}
if (this.currentPriority !== null) {
this.currentChildFromBeforeUpdate = this.children.get(
this.priorities[this.currentPriority]
)!;
this.currentPriority = null;
}
this.latestAttributes = attributes;
this.latestUpdates.clear();
this.priorities = priorityConfig.priorities;
/* Pair up the new child configs with the corresponding address lists, and
* update all existing children with their new configs */
for (const [childName, childConfig] of priorityConfig.children) {
const chosenChildConfig = getFirstUsableConfig(childConfig.config);
if (chosenChildConfig !== null) {
const childAddresses = childAddressMap.get(childName) ?? [];
this.latestUpdates.set(childName, {
subchannelAddress: childAddresses,
lbConfig: chosenChildConfig,
});
const existingChild = this.children.get(childName);
if (existingChild !== undefined) {
existingChild.updateAddressList(
childAddresses,
chosenChildConfig,
attributes
);
}
}
}
// Deactivate all children that are no longer in the priority list
for (const [childName, child] of this.children) {
if (this.priorities.indexOf(childName) < 0) {
child.deactivate();
}
}
// Only report connecting if there are no existing children
this.tryNextPriority(this.children.size === 0);
}
exitIdle(): void {
if (this.currentPriority !== null) {
this.children.get(this.priorities[this.currentPriority])?.exitIdle();
}
}
resetBackoff(): void {
for (const child of this.children.values()) {
child.resetBackoff();
}
}
destroy(): void {
for (const child of this.children.values()) {
child.destroy();
}
this.children.clear();
this.currentChildFromBeforeUpdate?.destroy();
this.currentChildFromBeforeUpdate = null;
}
getTypeName(): string {
return TYPE_NAME;
}
}
export function setup() {
registerLoadBalancerType(TYPE_NAME, PriorityLoadBalancer);
}

View File

@ -0,0 +1,322 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import { LoadBalancer, ChannelControlHelper, getFirstUsableConfig, registerLoadBalancerType } from "./load-balancer";
import { SubchannelAddress } from "./subchannel";
import { LoadBalancingConfig, WeightedTarget, isWeightedTargetLoadBalancingConfig } from "./load-balancing-config";
import { Picker, PickResult, PickArgs, QueuePicker, UnavailablePicker } from "./picker";
import { ConnectivityState } from "./channel";
import { ChildLoadBalancerHandler } from "./load-balancer-child-handler";
import { Status } from "./constants";
import { Metadata } from "./metadata";
import { isLocalitySubchannelAddress, LocalitySubchannelAddress } from "./load-balancer-priority";
const TYPE_NAME = 'weighted_target';
const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000;
/**
* Represents a picker and a subinterval of a larger interval used for randomly
* selecting an element of a list of these objects.
*/
interface WeightedPicker {
picker: Picker;
/**
* The exclusive end of the interval associated with this element. The start
* of the interval is implicitly the rangeEnd of the previous element in the
* list, or 0 for the first element in the list.
*/
rangeEnd: number;
}
class WeightedTargetPicker implements Picker {
private rangeTotal: number;
constructor(private readonly pickerList: WeightedPicker[]) {
this.rangeTotal = pickerList[pickerList.length - 1].rangeEnd;
}
pick(pickArgs: PickArgs): PickResult {
// num | 0 is equivalent to floor(num)
const selection = (Math.random() * this.rangeTotal) | 0;
/* Binary search for the element of the list such that
* pickerList[index - 1].rangeEnd <= selection < pickerList[index].rangeEnd
*/
let mid = 0;
let startIndex = 0;
let endIndex = this.pickerList.length - 1;
let index = 0;
while (endIndex > startIndex) {
mid = ((startIndex + endIndex) / 2) | 0;
if (this.pickerList[mid].rangeEnd > selection) {
endIndex = mid;
} else if (this.pickerList[mid].rangeEnd < selection) {
startIndex = mid + 1;
} else {
// + 1 here because the range is exclusive at the top end
index = mid + 1;
break;
}
}
if (index === 0) {
index = startIndex;
}
return this.pickerList[index].picker.pick(pickArgs);
}
}
interface WeightedChild {
updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void;
exitIdle(): void;
resetBackoff(): void;
destroy(): void;
deactivate(): void;
maybeReactivate(): void;
getConnectivityState(): ConnectivityState;
getPicker(): Picker;
getWeight(): number;
}
export class WeightedTargetLoadBalancer implements LoadBalancer {
private WeightedChildImpl = class implements WeightedChild {
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
private picker: Picker;
private childBalancer: ChildLoadBalancerHandler;
private deactivationTimer: NodeJS.Timer | null = null;
private weight: number = 0;
constructor(private parent: WeightedTargetLoadBalancer, private name: string) {
this.childBalancer = new ChildLoadBalancerHandler({
createSubchannel: (subchannelAddress, subchannelOptions) => {
return this.parent.channelControlHelper.createSubchannel(subchannelAddress, subchannelOptions);
},
updateState: (connectivityState, picker) => {
this.updateState(connectivityState, picker);
},
requestReresolution: () => {
this.parent.channelControlHelper.requestReresolution();
}
});
this.picker = new QueuePicker(this.childBalancer);
}
private updateState(connectivityState: ConnectivityState, picker: Picker) {
this.connectivityState = connectivityState;
this.picker = picker;
this.parent.updateState();
}
updateAddressList(addressList: SubchannelAddress[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void {
this.weight = lbConfig.weight;
const childConfig = getFirstUsableConfig(lbConfig.child_policy);
if (childConfig !== null) {
this.childBalancer.updateAddressList(addressList, childConfig, attributes);
}
}
exitIdle(): void {
this.childBalancer.exitIdle();
}
resetBackoff(): void {
this.childBalancer.resetBackoff();
}
destroy(): void {
this.childBalancer.destroy();
if (this.deactivationTimer !== null) {
clearTimeout(this.deactivationTimer);
}
}
deactivate(): void {
if (this.deactivationTimer === null) {
this.deactivationTimer = setTimeout(() => {
this.parent.targets.delete(this.name);
this.deactivationTimer = null;
}, DEFAULT_RETENTION_INTERVAL_MS);
}
}
maybeReactivate(): void {
if (this.deactivationTimer !== null) {
clearTimeout(this.deactivationTimer);
this.deactivationTimer = null;
}
}
getConnectivityState(): ConnectivityState {
return this.connectivityState;
}
getPicker(): Picker {
return this.picker;
}
getWeight(): number {
return this.weight;
}
}
// end of WeightedChildImpl
/**
* Map of target names to target children. Includes current targets and
* previous targets with deactivation timers that have not yet triggered.
*/
private targets: Map<string, WeightedChild> = new Map<string, WeightedChild>();
/**
* List of current target names.
*/
private targetList: string[] = [];
constructor(private channelControlHelper: ChannelControlHelper) {}
private updateState() {
const pickerList: WeightedPicker[] = [];
let end = 0;
let connectingCount = 0;
let idleCount = 0;
let transientFailureCount = 0;
for (const targetName of this.targetList) {
const target = this.targets.get(targetName);
if (target === undefined) {
continue;
}
switch (target.getConnectivityState()) {
case ConnectivityState.READY:
end += target.getWeight();
pickerList.push({
picker: target.getPicker(),
rangeEnd: end
});
break;
case ConnectivityState.CONNECTING:
connectingCount += 1;
break;
case ConnectivityState.IDLE:
idleCount += 1;
break;
case ConnectivityState.TRANSIENT_FAILURE:
transientFailureCount += 1;
break;
default:
// Ignore the other possiblity, SHUTDOWN
}
}
let connectivityState: ConnectivityState;
if (pickerList.length > 0) {
connectivityState = ConnectivityState.READY;
} else if (connectingCount > 0) {
connectivityState = ConnectivityState.CONNECTING;
} else if (idleCount > 0) {
connectivityState = ConnectivityState.IDLE;
} else {
connectivityState = ConnectivityState.TRANSIENT_FAILURE;
}
let picker: Picker;
switch (connectivityState) {
case ConnectivityState.READY:
picker = new WeightedTargetPicker(pickerList);
break;
case ConnectivityState.CONNECTING:
case ConnectivityState.READY:
picker = new QueuePicker(this);
break;
default:
picker = new UnavailablePicker({
code: Status.UNAVAILABLE,
details: 'weighted_target: all children report state TRANSIENT_FAILURE',
metadata: new Metadata()
});
}
this.channelControlHelper.updateState(connectivityState, picker);
}
updateAddressList(addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
if (!isWeightedTargetLoadBalancingConfig(lbConfig)) {
// Reject a config of the wrong type
return;
}
/* For each address, the first element of its localityPath array determines
* which child it belongs to. So we bucket those addresses by that first
* element, and pass along the rest of the localityPath for that child
* to use. */
const childAddressMap = new Map<string, SubchannelAddress[]>();
for (const address of addressList) {
if (!isLocalitySubchannelAddress(address)) {
// Reject address that cannot be associated with targets
return;
}
if (address.localityPath.length < 1) {
// Reject address that cannot be associated with targets
return;
}
const childName = address.localityPath[0];
const childAddress: LocalitySubchannelAddress = {
...address,
localityPath: address.localityPath.slice(1),
};
let childAddressList = childAddressMap.get(childName);
if (childAddressList === undefined) {
childAddressList = [];
childAddressMap.set(childName, childAddressList);
}
childAddressList.push(childAddress);
}
this.targetList = Array.from(lbConfig.weighted_target.targets.keys());
for (const [targetName, targetConfig] of lbConfig.weighted_target.targets) {
let target = this.targets.get(targetName);
if (target === undefined) {
target = new this.WeightedChildImpl(this, targetName);
this.targets.set(targetName, target);
} else {
target.maybeReactivate();
}
target.updateAddressList(childAddressMap.get(targetName) ?? [], targetConfig, attributes);
}
// Deactivate targets that are not in the new config
for (const [targetName, target] of this.targets) {
if (this.targetList.indexOf(targetName) < 0) {
target.deactivate();
}
}
this.updateState();
}
exitIdle(): void {
for (const targetName of this.targetList) {
this.targets.get(targetName)?.exitIdle();
}
}
resetBackoff(): void {
for (const targetName of this.targetList) {
this.targets.get(targetName)?.resetBackoff();
}
}
destroy(): void {
for (const target of this.targets.values()) {
target.destroy();
}
this.targets.clear();
}
getTypeName(): string {
return TYPE_NAME;
}
}
export function setup() {
registerLoadBalancerType(TYPE_NAME, WeightedTargetLoadBalancer);
}

View File

@ -22,6 +22,8 @@ import { Picker } from './picker';
import { LoadBalancingConfig } from './load-balancing-config';
import * as load_balancer_pick_first from './load-balancer-pick-first';
import * as load_balancer_round_robin from './load-balancer-round-robin';
import * as load_balancer_priority from './load-balancer-priority';
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
/**
* A collection of functions associated with a channel that a load balancer
@ -137,4 +139,6 @@ export function getFirstUsableConfig(
export function registerAll() {
load_balancer_pick_first.setup();
load_balancer_round_robin.setup();
load_balancer_priority.setup();
load_balancer_weighted_target.setup();
}

View File

@ -48,6 +48,15 @@ export interface PriorityLbConfig {
priorities: string[];
}
export interface WeightedTarget {
weight: number;
child_policy: LoadBalancingConfig[];
}
export interface WeightedTargetLbConfig {
targets: Map<string, WeightedTarget>;
}
export interface PickFirstLoadBalancingConfig {
name: 'pick_first';
pick_first: PickFirstConfig;
@ -73,12 +82,18 @@ export interface PriorityLoadBalancingConfig {
priority: PriorityLbConfig;
}
export interface WeightedTargetLoadBalancingConfig {
name: 'weighted_target';
weighted_target: WeightedTargetLbConfig;
}
export type LoadBalancingConfig =
| PickFirstLoadBalancingConfig
| RoundRobinLoadBalancingConfig
| XdsLoadBalancingConfig
| GrpcLbLoadBalancingConfig
| PriorityLoadBalancingConfig;
| PriorityLoadBalancingConfig
| WeightedTargetLoadBalancingConfig;
export function isRoundRobinLoadBalancingConfig(
lbconfig: LoadBalancingConfig
@ -104,6 +119,12 @@ export function isPriorityLoadBalancingConfig(
return lbconfig.name === 'priority';
}
export function isWeightedTargetLoadBalancingConfig(
lbconfig: LoadBalancingConfig
): lbconfig is WeightedTargetLoadBalancingConfig {
return lbconfig.name === 'weighted_target';
}
/* In these functions we assume the input came from a JSON object. Therefore we
* expect that the prototype is uninteresting and that `in` can be used
* effectively */

View File

@ -269,12 +269,7 @@ class DnsResolver implements Resolver {
* @param target
*/
static getDefaultAuthority(target: GrpcUri): string {
const hostPort = splitHostPort(target.path);
if (hostPort !== null) {
return hostPort.host;
} else {
throw new Error(`Failed to parse target ${uriToString(target)}`);
}
return target.path;
}
}

View File

@ -500,10 +500,11 @@ export class Server {
}
}
// If any sessions are active, close them gracefully.
pendingChecks += this.sessions.size;
this.sessions.forEach((session) => {
session.close(maybeCallback);
if (!session.closed) {
pendingChecks += 1;
session.close(maybeCallback);
}
});
if (pendingChecks === 0) {
callback();
@ -608,6 +609,10 @@ export class Server {
}
this.sessions.add(session);
session.on('close', () => {
this.sessions.delete(session);
});
});
}
}

View File

@ -28,7 +28,7 @@ import * as logging from './logging';
import { LogVerbosity } from './constants';
import { getProxiedConnection, ProxyConnectionResult } from './http_proxy';
import * as net from 'net';
import { GrpcUri } from './uri-parser';
import { GrpcUri, parseUri, splitHostPort } from './uri-parser';
import { ConnectionOptions } from 'tls';
import { FilterFactory, Filter } from './filter';
@ -286,6 +286,9 @@ export class Subchannel {
}
private createSession(proxyConnectionResult: ProxyConnectionResult) {
const targetAuthority = getDefaultAuthority(
proxyConnectionResult.realTarget ?? this.channelTarget
);
let connectionOptions: http2.SecureClientSessionOptions =
this.credentials._getConnectionOptions() || {};
let addressScheme = 'http://';
@ -305,8 +308,18 @@ export class Subchannel {
return checkServerIdentity(sslTargetNameOverride, cert);
};
connectionOptions.servername = sslTargetNameOverride;
} else {
const authorityHostname =
splitHostPort(targetAuthority)?.host ?? 'localhost';
// We want to always set servername to support SNI
connectionOptions.servername = authorityHostname;
}
if (proxyConnectionResult.socket) {
/* This is part of the workaround for
* https://github.com/nodejs/node/issues/32922. Without that bug,
* proxyConnectionResult.socket would always be a plaintext socket and
* this would say
* connectionOptions.socket = proxyConnectionResult.socket; */
connectionOptions.createConnection = (authority, option) => {
return proxyConnectionResult.socket!;
};
@ -350,10 +363,7 @@ export class Subchannel {
* determines whether the connection will be established over TLS or not.
*/
const session = http2.connect(
addressScheme +
getDefaultAuthority(
proxyConnectionResult.realTarget ?? this.channelTarget
),
addressScheme + targetAuthority,
connectionOptions
);
this.session = session;
@ -404,6 +414,11 @@ export class Subchannel {
KEEPALIVE_MAX_TIME_MS
);
}
trace(
this.subchannelAddress +
' connection closed by GOAWAY with code ' +
errorCode
);
this.transitionToState(
[ConnectivityState.CONNECTING, ConnectivityState.READY],
ConnectivityState.IDLE
@ -446,6 +461,18 @@ export class Subchannel {
return checkServerIdentity(sslTargetNameOverride, cert);
};
connectionOptions.servername = sslTargetNameOverride;
} else {
if ('grpc.http_connect_target' in this.options) {
/* This is more or less how servername will be set in createSession
* if a connection is successfully established through the proxy.
* If the proxy is not used, these connectionOptions are discarded
* anyway */
connectionOptions.servername = getDefaultAuthority(
parseUri(this.options['grpc.http_connect_target'] as string) ?? {
path: 'localhost',
}
);
}
}
}
@ -640,7 +667,24 @@ export class Subchannel {
headers[HTTP2_HEADER_METHOD] = 'POST';
headers[HTTP2_HEADER_PATH] = callStream.getMethod();
headers[HTTP2_HEADER_TE] = 'trailers';
const http2Stream = this.session!.request(headers);
let http2Stream: http2.ClientHttp2Stream;
/* In theory, if an error is thrown by session.request because session has
* become unusable (e.g. because it has received a goaway), this subchannel
* should soon see the corresponding close or goaway event anyway and leave
* READY. But we have seen reports that this does not happen
* (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096)
* so for defense in depth, we just discard the session when we see an
* error here.
*/
try {
http2Stream = this.session!.request(headers);
} catch (e) {
this.transitionToState(
[ConnectivityState.READY],
ConnectivityState.TRANSIENT_FAILURE
);
throw e;
}
let headersString = '';
for (const header of Object.keys(headers)) {
headersString += '\t\t' + header + ': ' + headers[header] + '\n';

View File

@ -24,7 +24,7 @@
"prepublishOnly": "git submodule update --init --recursive && node copy_well_known_protos.js"
},
"dependencies": {
"node-pre-gyp": "^0.12.0"
"node-pre-gyp": "^0.15.0"
},
"binary": {
"module_name": "grpc_tools",

View File

@ -1,6 +1,6 @@
{
"name": "@grpc/proto-loader",
"version": "0.5.4",
"version": "0.5.5",
"author": "Google Inc.",
"contributors": [
{

View File

@ -369,23 +369,29 @@ export function loadSync(
}
// Load Google's well-known proto files that aren't exposed by Protobuf.js.
{
// Protobuf.js exposes: any, duration, empty, field_mask, struct, timestamp,
// and wrappers. compiler/plugin is excluded in Protobuf.js and here.
const wellKnownProtos = ['api', 'descriptor', 'source_context', 'type'];
const sourceDir = path.join(
path.dirname(require.resolve('protobufjs')),
'google',
'protobuf'
);
for (const proto of wellKnownProtos) {
const file = path.join(sourceDir, `${proto}.proto`);
const descriptor = Protobuf.loadSync(file).toJSON();
// Protobuf.js exposes: any, duration, empty, field_mask, struct, timestamp,
// and wrappers. compiler/plugin is excluded in Protobuf.js and here.
Protobuf.common(
proto,
(descriptor.nested!.google as Protobuf.INamespace).nested!
);
}
}
// Using constant strings for compatibility with tools like Webpack
const apiDescriptor = require('protobufjs/google/protobuf/api.json');
const descriptorDescriptor = require('protobufjs/google/protobuf/descriptor.json');
const sourceContextDescriptor = require('protobufjs/google/protobuf/source_context.json');
const typeDescriptor = require('protobufjs/google/protobuf/type.json');
Protobuf.common(
'api',
apiDescriptor.nested.google.nested.protobuf.nested
);
Protobuf.common(
'descriptor',
descriptorDescriptor.nested.google.nested.protobuf.nested
);
Protobuf.common(
'source_context',
sourceContextDescriptor.nested.google.nested.protobuf.nested
);
Protobuf.common(
'type',
typeDescriptor.nested.google.nested.protobuf.nested
);

View File

@ -19,3 +19,7 @@ import "google/protobuf/descriptor.proto";
extend google.protobuf.FieldOptions {
bool redact = 52000;
}
message DescriptorHolder {
google.protobuf.DescriptorProto descriptor = 1;
}

View File

@ -144,6 +144,30 @@ describe(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, functio
done();
});
});
it('should receive all messages in a long stream', function(done) {
this.timeout(20000);
var arg = {
response_type: 'COMPRESSABLE',
response_parameters: [
]
};
for (let i = 0; i < 100000; i++) {
arg.response_parameters.push({size: 0});
}
var call = client.streamingOutputCall(arg);
let responseCount = 0;
call.on('data', (value) => {
responseCount++;
});
call.on('status', (status) => {
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(responseCount, arg.response_parameters.length);
done();
});
call.on('error', (error) => {
assert.ifError(error);
});
});
describe('max message size', function() {
// A size that is larger than the default limit
const largeMessageSize = 8 * 1024 * 1024;

View File

@ -39,9 +39,13 @@ AsyncDelayQueue.prototype.runNext = function() {
var continueCallback = _.bind(this.runNext, this);
if (next) {
this.callback_pending = true;
setTimeout(function() {
if (next.delay === 0) {
next.callback(continueCallback);
}, next.delay);
} else {
setTimeout(function() {
next.callback(continueCallback);
}, next.delay);
}
} else {
this.callback_pending = false;
}