From 08dd1149518de14159b554347cbb0818db09ae39 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 24 Apr 2020 11:34:26 -0700 Subject: [PATCH 1/4] grpc-js: Add attributes argument passed from resolver to load balancer --- packages/grpc-js/src/load-balancer.ts | 3 ++- packages/grpc-js/src/resolver-dns.ts | 11 +++++++---- packages/grpc-js/src/resolver-uds.ts | 3 ++- packages/grpc-js/src/resolver.ts | 3 ++- packages/grpc-js/src/resolving-load-balancer.ts | 12 ++++++++---- 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index 5fa4bdc5..b1710467 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -67,7 +67,8 @@ export interface LoadBalancer { */ updateAddressList( addressList: SubchannelAddress[], - lbConfig: LoadBalancingConfig | null + lbConfig: LoadBalancingConfig | null, + attributes: {[key: string]: unknown} ): void; /** * If the load balancer is currently in the IDLE state, start connecting. diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index 274d8c34..016f3bd1 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -124,7 +124,7 @@ class DnsResolver implements Resolver { if (this.ipResult !== null) { trace('Returning IP address for target ' + uriToString(this.target)); setImmediate(() => { - this.listener.onSuccessfulResolution(this.ipResult!, null, null); + this.listener.onSuccessfulResolution(this.ipResult!, null, null, {}); }); return; } @@ -186,7 +186,8 @@ class DnsResolver implements Resolver { this.listener.onSuccessfulResolution( this.latestLookupResult, this.latestServiceConfig, - this.latestServiceConfigError + this.latestServiceConfigError, + {} ); }, (err) => { @@ -230,7 +231,8 @@ class DnsResolver implements Resolver { this.listener.onSuccessfulResolution( this.latestLookupResult, this.latestServiceConfig, - this.latestServiceConfigError + this.latestServiceConfigError, + {} ); } }, @@ -244,7 +246,8 @@ class DnsResolver implements Resolver { this.listener.onSuccessfulResolution( this.latestLookupResult, this.latestServiceConfig, - this.latestServiceConfigError + this.latestServiceConfigError, + {} ); } } diff --git a/packages/grpc-js/src/resolver-uds.ts b/packages/grpc-js/src/resolver-uds.ts index 759cb233..0bc92991 100644 --- a/packages/grpc-js/src/resolver-uds.ts +++ b/packages/grpc-js/src/resolver-uds.ts @@ -38,7 +38,8 @@ class UdsResolver implements Resolver { this.listener.onSuccessfulResolution, this.addresses, null, - null + null, + {} ); } diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index 7d6d1ad5..cef0c1f4 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -39,7 +39,8 @@ export interface ResolverListener { onSuccessfulResolution( addressList: SubchannelAddress[], serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null + serviceConfigError: StatusObject | null, + attributes: {[key: string]: unknown} ): void; /** * Called whenever a name resolution attempt fails. diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 48f1ddd4..75ea6eaa 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -136,7 +136,8 @@ export class ResolvingLoadBalancer implements LoadBalancer { onSuccessfulResolution: ( addressList: SubchannelAddress[], serviceConfig: ServiceConfig | null, - serviceConfigError: ServiceError | null + serviceConfigError: ServiceError | null, + attributes: {[key: string]: unknown} ) => { let workingServiceConfig: ServiceConfig | null = null; /* This first group of conditionals implements the algorithm described @@ -211,12 +212,14 @@ export class ResolvingLoadBalancer implements LoadBalancer { )!; this.innerLoadBalancer.updateAddressList( addressList, - loadBalancingConfig + loadBalancingConfig, + attributes ); } else if (this.innerLoadBalancer.getTypeName() === loadBalancerName) { this.innerLoadBalancer.updateAddressList( addressList, - loadBalancingConfig + loadBalancingConfig, + attributes ); } else { if ( @@ -234,7 +237,8 @@ export class ResolvingLoadBalancer implements LoadBalancer { } this.pendingReplacementLoadBalancer.updateAddressList( addressList, - loadBalancingConfig + loadBalancingConfig, + attributes ); } }, From 3d4a27e6cc3f19ae660a076b63077942721f8441 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 24 Apr 2020 14:00:54 -0700 Subject: [PATCH 2/4] Plumb through an extra filter from the load balancer to the call stream --- packages/grpc-js/src/call-stream.ts | 10 +++++++--- packages/grpc-js/src/channel.ts | 3 ++- packages/grpc-js/src/picker.ts | 12 ++++++++++++ packages/grpc-js/src/subchannel.ts | 5 +++-- 4 files changed, 24 insertions(+), 6 deletions(-) diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index ca3049e7..3a853315 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -19,8 +19,8 @@ import * as http2 from 'http2'; import { CallCredentials } from './call-credentials'; import { Status } from './constants'; -import { Filter } from './filter'; -import { FilterStackFactory } from './filter-stack'; +import { Filter, FilterFactory } from './filter'; +import { FilterStackFactory, FilterStack } from './filter-stack'; import { Metadata } from './metadata'; import { StreamDecoder } from './stream-decoder'; import { ChannelImplementation } from './channel'; @@ -407,8 +407,12 @@ export class Http2CallStream implements Call { attachHttp2Stream( stream: http2.ClientHttp2Stream, - subchannel: Subchannel + subchannel: Subchannel, + extraFilterFactory?: FilterFactory ): void { + if (extraFilterFactory !== undefined) { + this.filterStack = new FilterStack([this.filterStack, extraFilterFactory.createFilter(this)]); + } if (this.finalStatus !== null) { stream.close(NGHTTP2_CANCEL); } else { diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index f2a7bc09..f4df2fce 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -300,7 +300,8 @@ export class ChannelImplementation implements Channel { try { pickResult.subchannel!.startCallStream( finalMetadata, - callStream + callStream, + pickResult.extraFilterFactory ?? undefined ); } catch (error) { if ( diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index d908f026..9470f122 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -20,6 +20,7 @@ import { StatusObject } from './call-stream'; import { Metadata } from './metadata'; import { Status } from './constants'; import { LoadBalancer } from './load-balancer'; +import { FilterFactory, Filter } from './filter'; export enum PickResultType { COMPLETE, @@ -40,24 +41,33 @@ export interface PickResult { * `pickResultType` is TRANSIENT_FAILURE. */ status: StatusObject | null; + /** + * Extra FilterFactory (can be multiple encapsulated in a FilterStackFactory) + * provided by the load balancer to be used with the call. For technical + * reasons filters from this factory will not see sendMetadata events. + */ + extraFilterFactory: FilterFactory | null; } export interface CompletePickResult extends PickResult { pickResultType: PickResultType.COMPLETE; subchannel: Subchannel | null; status: null; + extraFilterFactory: FilterFactory | null; } export interface QueuePickResult extends PickResult { pickResultType: PickResultType.QUEUE; subchannel: null; status: null; + extraFilterFactory: null; } export interface TransientFailurePickResult extends PickResult { pickResultType: PickResultType.TRANSIENT_FAILURE; subchannel: null; status: StatusObject; + extraFilterFactory: null; } export interface PickArgs { @@ -95,6 +105,7 @@ export class UnavailablePicker implements Picker { pickResultType: PickResultType.TRANSIENT_FAILURE, subchannel: null, status: this.status, + extraFilterFactory: null }; } } @@ -122,6 +133,7 @@ export class QueuePicker { pickResultType: PickResultType.QUEUE, subchannel: null, status: null, + extraFilterFactory: null }; } } diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 2af7885e..713f0d9a 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -30,6 +30,7 @@ import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; import * as net from 'net'; import { GrpcUri } from './uri-parser'; import { ConnectionOptions } from 'tls'; +import { FilterFactory, Filter } from './filter'; const clientVersion = require('../../package.json').version; @@ -619,7 +620,7 @@ export class Subchannel { * @param metadata * @param callStream */ - startCallStream(metadata: Metadata, callStream: Http2CallStream) { + startCallStream(metadata: Metadata, callStream: Http2CallStream, extraFilterFactory?: FilterFactory) { const headers = metadata.toHttp2Headers(); headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost(); headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; @@ -633,7 +634,7 @@ export class Subchannel { headersString += '\t\t' + header + ': ' + headers[header] + '\n'; } trace('Starting stream with headers\n' + headersString); - callStream.attachHttp2Stream(http2Stream, this); + callStream.attachHttp2Stream(http2Stream, this, extraFilterFactory); } /** From 424c9bfe70601cccb0698341b95fa6b6a9d29283 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 27 Apr 2020 10:24:15 -0700 Subject: [PATCH 3/4] Add onCallStarted field to Pick object --- packages/grpc-js/src/channel.ts | 3 +++ packages/grpc-js/src/load-balancer-pick-first.ts | 2 ++ packages/grpc-js/src/load-balancer-round-robin.ts | 2 ++ packages/grpc-js/src/picker.ts | 10 ++++++++-- 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index f4df2fce..b240cc43 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -303,6 +303,9 @@ export class ChannelImplementation implements Channel { callStream, pickResult.extraFilterFactory ?? undefined ); + /* If we reach this point, the call stream has started + * successfully */ + pickResult.onCallStarted?.(); } catch (error) { if ( (error as NodeJS.ErrnoException).code === diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 6b9756ff..6289123d 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -65,6 +65,8 @@ class PickFirstPicker implements Picker { pickResultType: PickResultType.COMPLETE, subchannel: this.subchannel, status: null, + extraFilterFactory: null, + onCallStarted: null }; } } diff --git a/packages/grpc-js/src/load-balancer-round-robin.ts b/packages/grpc-js/src/load-balancer-round-robin.ts index 93c64610..51244341 100644 --- a/packages/grpc-js/src/load-balancer-round-robin.ts +++ b/packages/grpc-js/src/load-balancer-round-robin.ts @@ -60,6 +60,8 @@ class RoundRobinPicker implements Picker { pickResultType: PickResultType.COMPLETE, subchannel: pickedSubchannel, status: null, + extraFilterFactory: null, + onCallStarted: null, }; } diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index 9470f122..306ac3ed 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -47,6 +47,7 @@ export interface PickResult { * reasons filters from this factory will not see sendMetadata events. */ extraFilterFactory: FilterFactory | null; + onCallStarted: (() => void) | null; } export interface CompletePickResult extends PickResult { @@ -54,6 +55,7 @@ export interface CompletePickResult extends PickResult { subchannel: Subchannel | null; status: null; extraFilterFactory: FilterFactory | null; + onCallStarted: (() => void) | null; } export interface QueuePickResult extends PickResult { @@ -61,6 +63,7 @@ export interface QueuePickResult extends PickResult { subchannel: null; status: null; extraFilterFactory: null; + onCallStarted: null; } export interface TransientFailurePickResult extends PickResult { @@ -68,6 +71,7 @@ export interface TransientFailurePickResult extends PickResult { subchannel: null; status: StatusObject; extraFilterFactory: null; + onCallStarted: null; } export interface PickArgs { @@ -105,7 +109,8 @@ export class UnavailablePicker implements Picker { pickResultType: PickResultType.TRANSIENT_FAILURE, subchannel: null, status: this.status, - extraFilterFactory: null + extraFilterFactory: null, + onCallStarted: null }; } } @@ -133,7 +138,8 @@ export class QueuePicker { pickResultType: PickResultType.QUEUE, subchannel: null, status: null, - extraFilterFactory: null + extraFilterFactory: null, + onCallStarted: null }; } } From a2839e7b2d3515995641054e787657b8b442dafa Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 27 Apr 2020 11:56:49 -0700 Subject: [PATCH 4/4] gts fix --- packages/grpc-js/src/call-stream.ts | 5 ++++- packages/grpc-js/src/channel.ts | 4 +++- packages/grpc-js/src/load-balancer-pick-first.ts | 2 +- packages/grpc-js/src/load-balancer.ts | 2 +- packages/grpc-js/src/picker.ts | 4 ++-- packages/grpc-js/src/resolver-uds.ts | 6 +----- packages/grpc-js/src/resolver.ts | 4 ++-- packages/grpc-js/src/resolving-load-balancer.ts | 2 +- packages/grpc-js/src/subchannel.ts | 6 +++++- 9 files changed, 20 insertions(+), 15 deletions(-) diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 3a853315..57dc3bde 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -411,7 +411,10 @@ export class Http2CallStream implements Call { extraFilterFactory?: FilterFactory ): void { if (extraFilterFactory !== undefined) { - this.filterStack = new FilterStack([this.filterStack, extraFilterFactory.createFilter(this)]); + this.filterStack = new FilterStack([ + this.filterStack, + extraFilterFactory.createFilter(this), + ]); } if (this.finalStatus !== null) { stream.close(NGHTTP2_CANCEL); diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index b240cc43..fe85dec0 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -174,7 +174,9 @@ export class ChannelImplementation implements Channel { * resolver */ const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri); if (defaultSchemeMapResult === null) { - throw new Error(`Could not find a default scheme for target name "${target}"`); + throw new Error( + `Could not find a default scheme for target name "${target}"` + ); } if (this.options['grpc.default_authority']) { this.defaultAuthority = this.options['grpc.default_authority'] as string; diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 6289123d..f3888618 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -66,7 +66,7 @@ class PickFirstPicker implements Picker { subchannel: this.subchannel, status: null, extraFilterFactory: null, - onCallStarted: null + onCallStarted: null, }; } } diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index b1710467..a5a22b56 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -68,7 +68,7 @@ export interface LoadBalancer { updateAddressList( addressList: SubchannelAddress[], lbConfig: LoadBalancingConfig | null, - attributes: {[key: string]: unknown} + attributes: { [key: string]: unknown } ): void; /** * If the load balancer is currently in the IDLE state, start connecting. diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index 306ac3ed..42eeda82 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -110,7 +110,7 @@ export class UnavailablePicker implements Picker { subchannel: null, status: this.status, extraFilterFactory: null, - onCallStarted: null + onCallStarted: null, }; } } @@ -139,7 +139,7 @@ export class QueuePicker { subchannel: null, status: null, extraFilterFactory: null, - onCallStarted: null + onCallStarted: null, }; } } diff --git a/packages/grpc-js/src/resolver-uds.ts b/packages/grpc-js/src/resolver-uds.ts index 0bc92991..25856913 100644 --- a/packages/grpc-js/src/resolver-uds.ts +++ b/packages/grpc-js/src/resolver-uds.ts @@ -14,11 +14,7 @@ * limitations under the License. */ -import { - Resolver, - ResolverListener, - registerResolver, -} from './resolver'; +import { Resolver, ResolverListener, registerResolver } from './resolver'; import { SubchannelAddress } from './subchannel'; import { GrpcUri } from './uri-parser'; diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index cef0c1f4..16c84351 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -40,7 +40,7 @@ export interface ResolverListener { addressList: SubchannelAddress[], serviceConfig: ServiceConfig | null, serviceConfigError: StatusObject | null, - attributes: {[key: string]: unknown} + attributes: { [key: string]: unknown } ): void; /** * Called whenever a name resolution attempt fails. @@ -138,7 +138,7 @@ export function mapUriDefaultScheme(target: GrpcUri): GrpcUri | null { return { scheme: defaultScheme, authority: undefined, - path: uriToString(target) + path: uriToString(target), }; } else { return null; diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 75ea6eaa..f3e2eda3 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -137,7 +137,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { addressList: SubchannelAddress[], serviceConfig: ServiceConfig | null, serviceConfigError: ServiceError | null, - attributes: {[key: string]: unknown} + attributes: { [key: string]: unknown } ) => { let workingServiceConfig: ServiceConfig | null = null; /* This first group of conditionals implements the algorithm described diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 713f0d9a..ab7b176c 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -620,7 +620,11 @@ export class Subchannel { * @param metadata * @param callStream */ - startCallStream(metadata: Metadata, callStream: Http2CallStream, extraFilterFactory?: FilterFactory) { + startCallStream( + metadata: Metadata, + callStream: Http2CallStream, + extraFilterFactory?: FilterFactory + ) { const headers = metadata.toHttp2Headers(); headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost(); headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;