diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 11f9adc0..fcfa8af1 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -19,8 +19,8 @@ import * as http2 from 'http2'; import { CallCredentials } from './call-credentials'; import { Status } from './constants'; -import { Filter } from './filter'; -import { FilterStackFactory } from './filter-stack'; +import { Filter, FilterFactory } from './filter'; +import { FilterStackFactory, FilterStack } from './filter-stack'; import { Metadata } from './metadata'; import { StreamDecoder } from './stream-decoder'; import { ChannelImplementation } from './channel'; @@ -407,8 +407,15 @@ export class Http2CallStream implements Call { attachHttp2Stream( stream: http2.ClientHttp2Stream, - subchannel: Subchannel + subchannel: Subchannel, + extraFilterFactory?: FilterFactory ): void { + if (extraFilterFactory !== undefined) { + this.filterStack = new FilterStack([ + this.filterStack, + extraFilterFactory.createFilter(this), + ]); + } if (this.finalStatus !== null) { stream.close(NGHTTP2_CANCEL); } else { diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index f2a7bc09..fe85dec0 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -174,7 +174,9 @@ export class ChannelImplementation implements Channel { * resolver */ const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri); if (defaultSchemeMapResult === null) { - throw new Error(`Could not find a default scheme for target name "${target}"`); + throw new Error( + `Could not find a default scheme for target name "${target}"` + ); } if (this.options['grpc.default_authority']) { this.defaultAuthority = this.options['grpc.default_authority'] as string; @@ -300,8 +302,12 @@ export class ChannelImplementation implements Channel { try { pickResult.subchannel!.startCallStream( finalMetadata, - callStream + callStream, + pickResult.extraFilterFactory ?? undefined ); + /* If we reach this point, the call stream has started + * successfully */ + pickResult.onCallStarted?.(); } catch (error) { if ( (error as NodeJS.ErrnoException).code === diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 6b9756ff..f3888618 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -65,6 +65,8 @@ class PickFirstPicker implements Picker { pickResultType: PickResultType.COMPLETE, subchannel: this.subchannel, status: null, + extraFilterFactory: null, + onCallStarted: null, }; } } diff --git a/packages/grpc-js/src/load-balancer-round-robin.ts b/packages/grpc-js/src/load-balancer-round-robin.ts index 93c64610..51244341 100644 --- a/packages/grpc-js/src/load-balancer-round-robin.ts +++ b/packages/grpc-js/src/load-balancer-round-robin.ts @@ -60,6 +60,8 @@ class RoundRobinPicker implements Picker { pickResultType: PickResultType.COMPLETE, subchannel: pickedSubchannel, status: null, + extraFilterFactory: null, + onCallStarted: null, }; } diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index 5fa4bdc5..a5a22b56 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -67,7 +67,8 @@ export interface LoadBalancer { */ updateAddressList( addressList: SubchannelAddress[], - lbConfig: LoadBalancingConfig | null + lbConfig: LoadBalancingConfig | null, + attributes: { [key: string]: unknown } ): void; /** * If the load balancer is currently in the IDLE state, start connecting. diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index d908f026..42eeda82 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -20,6 +20,7 @@ import { StatusObject } from './call-stream'; import { Metadata } from './metadata'; import { Status } from './constants'; import { LoadBalancer } from './load-balancer'; +import { FilterFactory, Filter } from './filter'; export enum PickResultType { COMPLETE, @@ -40,24 +41,37 @@ export interface PickResult { * `pickResultType` is TRANSIENT_FAILURE. */ status: StatusObject | null; + /** + * Extra FilterFactory (can be multiple encapsulated in a FilterStackFactory) + * provided by the load balancer to be used with the call. For technical + * reasons filters from this factory will not see sendMetadata events. + */ + extraFilterFactory: FilterFactory | null; + onCallStarted: (() => void) | null; } export interface CompletePickResult extends PickResult { pickResultType: PickResultType.COMPLETE; subchannel: Subchannel | null; status: null; + extraFilterFactory: FilterFactory | null; + onCallStarted: (() => void) | null; } export interface QueuePickResult extends PickResult { pickResultType: PickResultType.QUEUE; subchannel: null; status: null; + extraFilterFactory: null; + onCallStarted: null; } export interface TransientFailurePickResult extends PickResult { pickResultType: PickResultType.TRANSIENT_FAILURE; subchannel: null; status: StatusObject; + extraFilterFactory: null; + onCallStarted: null; } export interface PickArgs { @@ -95,6 +109,8 @@ export class UnavailablePicker implements Picker { pickResultType: PickResultType.TRANSIENT_FAILURE, subchannel: null, status: this.status, + extraFilterFactory: null, + onCallStarted: null, }; } } @@ -122,6 +138,8 @@ export class QueuePicker { pickResultType: PickResultType.QUEUE, subchannel: null, status: null, + extraFilterFactory: null, + onCallStarted: null, }; } } diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index 274d8c34..016f3bd1 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -124,7 +124,7 @@ class DnsResolver implements Resolver { if (this.ipResult !== null) { trace('Returning IP address for target ' + uriToString(this.target)); setImmediate(() => { - this.listener.onSuccessfulResolution(this.ipResult!, null, null); + this.listener.onSuccessfulResolution(this.ipResult!, null, null, {}); }); return; } @@ -186,7 +186,8 @@ class DnsResolver implements Resolver { this.listener.onSuccessfulResolution( this.latestLookupResult, this.latestServiceConfig, - this.latestServiceConfigError + this.latestServiceConfigError, + {} ); }, (err) => { @@ -230,7 +231,8 @@ class DnsResolver implements Resolver { this.listener.onSuccessfulResolution( this.latestLookupResult, this.latestServiceConfig, - this.latestServiceConfigError + this.latestServiceConfigError, + {} ); } }, @@ -244,7 +246,8 @@ class DnsResolver implements Resolver { this.listener.onSuccessfulResolution( this.latestLookupResult, this.latestServiceConfig, - this.latestServiceConfigError + this.latestServiceConfigError, + {} ); } } diff --git a/packages/grpc-js/src/resolver-uds.ts b/packages/grpc-js/src/resolver-uds.ts index 759cb233..25856913 100644 --- a/packages/grpc-js/src/resolver-uds.ts +++ b/packages/grpc-js/src/resolver-uds.ts @@ -14,11 +14,7 @@ * limitations under the License. */ -import { - Resolver, - ResolverListener, - registerResolver, -} from './resolver'; +import { Resolver, ResolverListener, registerResolver } from './resolver'; import { SubchannelAddress } from './subchannel'; import { GrpcUri } from './uri-parser'; @@ -38,7 +34,8 @@ class UdsResolver implements Resolver { this.listener.onSuccessfulResolution, this.addresses, null, - null + null, + {} ); } diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index 7d6d1ad5..16c84351 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -39,7 +39,8 @@ export interface ResolverListener { onSuccessfulResolution( addressList: SubchannelAddress[], serviceConfig: ServiceConfig | null, - serviceConfigError: StatusObject | null + serviceConfigError: StatusObject | null, + attributes: { [key: string]: unknown } ): void; /** * Called whenever a name resolution attempt fails. @@ -137,7 +138,7 @@ export function mapUriDefaultScheme(target: GrpcUri): GrpcUri | null { return { scheme: defaultScheme, authority: undefined, - path: uriToString(target) + path: uriToString(target), }; } else { return null; diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 48f1ddd4..f3e2eda3 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -136,7 +136,8 @@ export class ResolvingLoadBalancer implements LoadBalancer { onSuccessfulResolution: ( addressList: SubchannelAddress[], serviceConfig: ServiceConfig | null, - serviceConfigError: ServiceError | null + serviceConfigError: ServiceError | null, + attributes: { [key: string]: unknown } ) => { let workingServiceConfig: ServiceConfig | null = null; /* This first group of conditionals implements the algorithm described @@ -211,12 +212,14 @@ export class ResolvingLoadBalancer implements LoadBalancer { )!; this.innerLoadBalancer.updateAddressList( addressList, - loadBalancingConfig + loadBalancingConfig, + attributes ); } else if (this.innerLoadBalancer.getTypeName() === loadBalancerName) { this.innerLoadBalancer.updateAddressList( addressList, - loadBalancingConfig + loadBalancingConfig, + attributes ); } else { if ( @@ -234,7 +237,8 @@ export class ResolvingLoadBalancer implements LoadBalancer { } this.pendingReplacementLoadBalancer.updateAddressList( addressList, - loadBalancingConfig + loadBalancingConfig, + attributes ); } }, diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 2af7885e..ab7b176c 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -30,6 +30,7 @@ import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; import * as net from 'net'; import { GrpcUri } from './uri-parser'; import { ConnectionOptions } from 'tls'; +import { FilterFactory, Filter } from './filter'; const clientVersion = require('../../package.json').version; @@ -619,7 +620,11 @@ export class Subchannel { * @param metadata * @param callStream */ - startCallStream(metadata: Metadata, callStream: Http2CallStream) { + startCallStream( + metadata: Metadata, + callStream: Http2CallStream, + extraFilterFactory?: FilterFactory + ) { const headers = metadata.toHttp2Headers(); headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost(); headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; @@ -633,7 +638,7 @@ export class Subchannel { headersString += '\t\t' + header + ': ' + headers[header] + '\n'; } trace('Starting stream with headers\n' + headersString); - callStream.attachHttp2Stream(http2Stream, this); + callStream.attachHttp2Stream(http2Stream, this, extraFilterFactory); } /**