diff --git a/packages/grpc-js-xds/README.md b/packages/grpc-js-xds/README.md index 2ada0bad..bcea7045 100644 --- a/packages/grpc-js-xds/README.md +++ b/packages/grpc-js-xds/README.md @@ -21,4 +21,5 @@ const client = new MyServiceClient('xds:///example.com:123'); ## Supported Features - - [xDS-Based Global Load Balancing](https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md) \ No newline at end of file + - [xDS-Based Global Load Balancing](https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md) + - [xDS traffic splitting and routing](https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md) \ No newline at end of file diff --git a/packages/grpc-js-xds/package.json b/packages/grpc-js-xds/package.json index f69c8bf4..c51cfcd9 100644 --- a/packages/grpc-js-xds/package.json +++ b/packages/grpc-js-xds/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js-xds", - "version": "1.2.4", + "version": "1.3.1", "description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.", "main": "build/src/index.js", "scripts": { @@ -47,7 +47,7 @@ "re2-wasm": "^1.0.1" }, "peerDependencies": { - "@grpc/grpc-js": "~1.2.10" + "@grpc/grpc-js": "~1.3.0" }, "engines": { "node": ">=10.10.0" diff --git a/packages/grpc-js-xds/src/matcher.ts b/packages/grpc-js-xds/src/matcher.ts index a70cf5d6..14cb7f67 100644 --- a/packages/grpc-js-xds/src/matcher.ts +++ b/packages/grpc-js-xds/src/matcher.ts @@ -197,8 +197,8 @@ export class PathExactValueMatcher { export class PathSafeRegexValueMatcher { private targetRegexImpl: RE2; - constructor(targetRegex: string, caseInsensitive: boolean) { - this.targetRegexImpl = new RE2(`^${targetRegex}$`, caseInsensitive ? 'iu' : 'u'); + constructor(targetRegex: string) { + this.targetRegexImpl = new RE2(`^${targetRegex}$`, 'u'); } apply(value: string) { diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index 892cf571..26a39292 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -171,7 +171,7 @@ function getPredicateForMatcher(routeMatch: RouteMatch__Output): Matcher { pathMatcher = new PathExactValueMatcher(routeMatch.path!, caseInsensitive); break; case 'safe_regex': - pathMatcher = new PathSafeRegexValueMatcher(routeMatch.safe_regex!.regex, caseInsensitive); + pathMatcher = new PathSafeRegexValueMatcher(routeMatch.safe_regex!.regex); break; default: pathMatcher = new RejectValueMatcher(); diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 3733e06f..255ee95e 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.2.12", + "version": "1.3.4", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index ae634260..083ebe85 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -597,7 +597,7 @@ export class Http2CallStream implements Call { * "Internal server error" message. */ details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`; } else { - if (this.internalError.errno === os.constants.errno.ECONNRESET) { + if (this.internalError.code === 'ECONNRESET') { code = Status.UNAVAILABLE; details = this.internalError.message; } else { diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index 7a2199b0..d19fe7d0 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -56,6 +56,10 @@ const INTERCEPTOR_SYMBOL = Symbol(); const INTERCEPTOR_PROVIDER_SYMBOL = Symbol(); const CALL_INVOCATION_TRANSFORMER_SYMBOL = Symbol(); +function isFunction(arg: Metadata | CallOptions | UnaryCallback | undefined): arg is UnaryCallback{ + return typeof arg === 'function'; +} + export interface UnaryCallback { (err: ServiceError | null, value?: ResponseType): void; } @@ -199,9 +203,9 @@ export class Client { options: CallOptions; callback: UnaryCallback; } { - if (arg1 instanceof Function) { + if (isFunction(arg1)) { return { metadata: new Metadata(), options: {}, callback: arg1 }; - } else if (arg2 instanceof Function) { + } else if (isFunction(arg2)) { if (arg1 instanceof Metadata) { return { metadata: arg1, options: {}, callback: arg2 }; } else { @@ -212,7 +216,7 @@ export class Client { !( arg1 instanceof Metadata && arg2 instanceof Object && - arg3 instanceof Function + isFunction(arg3) ) ) { throw new Error('Incorrect arguments passed'); @@ -672,3 +676,4 @@ export class Client { return stream; } } + diff --git a/packages/grpc-js/src/logging.ts b/packages/grpc-js/src/logging.ts index de8d2d05..71683dbf 100644 --- a/packages/grpc-js/src/logging.ts +++ b/packages/grpc-js/src/logging.ts @@ -22,7 +22,7 @@ let _logVerbosity: LogVerbosity = LogVerbosity.ERROR; const verbosityString = process.env.GRPC_NODE_VERBOSITY ?? process.env.GRPC_VERBOSITY ?? ''; -switch (verbosityString) { +switch (verbosityString.toUpperCase()) { case 'DEBUG': _logVerbosity = LogVerbosity.DEBUG; break; diff --git a/packages/grpc-js/src/resolver-ip.ts b/packages/grpc-js/src/resolver-ip.ts new file mode 100644 index 00000000..5c9e29c4 --- /dev/null +++ b/packages/grpc-js/src/resolver-ip.ts @@ -0,0 +1,107 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { isIPv4, isIPv6 } from "net"; +import { StatusObject } from "./call-stream"; +import { ChannelOptions } from "./channel-options"; +import { LogVerbosity, Status } from "./constants"; +import { Metadata } from "./metadata"; +import { registerResolver, Resolver, ResolverListener } from "./resolver"; +import { SubchannelAddress } from "./subchannel"; +import { GrpcUri, splitHostPort, uriToString } from "./uri-parser"; +import * as logging from './logging'; + +const TRACER_NAME = 'ip_resolver'; + +function trace(text: string): void { + logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); +} + +const IPV4_SCHEME = 'ipv4'; +const IPV6_SCHEME = 'ipv6'; + +/** + * The default TCP port to connect to if not explicitly specified in the target. + */ +const DEFAULT_PORT = 443; + +class IpResolver implements Resolver { + private addresses: SubchannelAddress[] = []; + private error: StatusObject | null = null; + constructor( + private target: GrpcUri, + private listener: ResolverListener, + channelOptions: ChannelOptions + ) { + trace('Resolver constructed for target ' + uriToString(target)); + const addresses: SubchannelAddress[] = []; + if (!(target.scheme === IPV4_SCHEME || target.scheme === IPV6_SCHEME)) { + this.error = { + code: Status.UNAVAILABLE, + details: `Unrecognized scheme ${target.scheme} in IP resolver`, + metadata: new Metadata() + }; + return; + } + const pathList = target.path.split(','); + for (const path of pathList) { + const hostPort = splitHostPort(path); + if (hostPort === null) { + this.error = { + code: Status.UNAVAILABLE, + details: `Failed to parse ${target.scheme} address ${path}`, + metadata: new Metadata() + }; + return; + } + if ((target.scheme === IPV4_SCHEME && !isIPv4(hostPort.host)) || (target.scheme === IPV6_SCHEME && !isIPv6(hostPort.host))) { + this.error = { + code: Status.UNAVAILABLE, + details: `Failed to parse ${target.scheme} address ${path}`, + metadata: new Metadata() + }; + return; + } + addresses.push({ + host: hostPort.host, + port: hostPort.port ?? DEFAULT_PORT + }); + } + this.addresses = addresses; + trace('Parsed ' + target.scheme + ' address list ' + this.addresses); + } + updateResolution(): void { + process.nextTick(() => { + if (this.error) { + this.listener.onError(this.error) + } else { + this.listener.onSuccessfulResolution(this.addresses, null, null, null, {}); + } + }); + } + destroy(): void { + // This resolver owns no resources, so we do nothing here. + } + + static getDefaultAuthority(target: GrpcUri): string { + return target.path.split(',')[0]; + } +} + +export function setup() { + registerResolver(IPV4_SCHEME, IpResolver); + registerResolver(IPV6_SCHEME, IpResolver); +} \ No newline at end of file diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index a76ea963..7d3e6e87 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -235,6 +235,7 @@ export class Subchannel { this.keepaliveTimeoutId = setTimeout(() => { this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE); }, this.keepaliveTimeoutMs); + this.keepaliveTimeoutId.unref?.(); this.session!.ping( (err: Error | null, duration: number, payload: Buffer) => { clearTimeout(this.keepaliveTimeoutId); @@ -246,6 +247,7 @@ export class Subchannel { this.keepaliveIntervalId = setInterval(() => { this.sendPing(); }, this.keepaliveTimeMs); + this.keepaliveIntervalId.unref?.() /* Don't send a ping immediately because whatever caused us to start * sending pings should also involve some network activity. */ } @@ -553,7 +555,6 @@ export class Subchannel { this.transitionToState( [ ConnectivityState.CONNECTING, - ConnectivityState.IDLE, ConnectivityState.READY, ], ConnectivityState.TRANSIENT_FAILURE diff --git a/packages/grpc-js/test/test-resolver.ts b/packages/grpc-js/test/test-resolver.ts index 976e6489..b5a4c164 100644 --- a/packages/grpc-js/test/test-resolver.ts +++ b/packages/grpc-js/test/test-resolver.ts @@ -391,6 +391,186 @@ describe('Name Resolver', () => { resolver.updateResolution(); }); }); + describe('IP Addresses', () => { + it('should handle one IPv4 address with no port', done => { + const target = resolverManager.mapUriDefaultScheme(parseUri('ipv4:127.0.0.1')!)!; + const listener: resolverManager.ResolverListener = { + onSuccessfulResolution: ( + addressList: SubchannelAddress[], + serviceConfig: ServiceConfig | null, + serviceConfigError: StatusObject | null + ) => { + // Only handle the first resolution result + listener.onSuccessfulResolution = () => {}; + assert( + addressList.some( + addr => + isTcpSubchannelAddress(addr) && + addr.host === '127.0.0.1' && + addr.port === 443 + ) + ); + done(); + }, + onError: (error: StatusObject) => { + done(new Error(`Failed with status ${error.details}`)); + }, + }; + const resolver = resolverManager.createResolver(target, listener, {}); + resolver.updateResolution(); + }); + it('should handle one IPv4 address with a port', done => { + const target = resolverManager.mapUriDefaultScheme(parseUri('ipv4:127.0.0.1:50051')!)!; + const listener: resolverManager.ResolverListener = { + onSuccessfulResolution: ( + addressList: SubchannelAddress[], + serviceConfig: ServiceConfig | null, + serviceConfigError: StatusObject | null + ) => { + // Only handle the first resolution result + listener.onSuccessfulResolution = () => {}; + assert( + addressList.some( + addr => + isTcpSubchannelAddress(addr) && + addr.host === '127.0.0.1' && + addr.port === 50051 + ) + ); + done(); + }, + onError: (error: StatusObject) => { + done(new Error(`Failed with status ${error.details}`)); + }, + }; + const resolver = resolverManager.createResolver(target, listener, {}); + resolver.updateResolution(); + }); + it('should handle multiple IPv4 addresses with different ports', done => { + const target = resolverManager.mapUriDefaultScheme(parseUri('ipv4:127.0.0.1:50051,127.0.0.1:50052')!)!; + const listener: resolverManager.ResolverListener = { + onSuccessfulResolution: ( + addressList: SubchannelAddress[], + serviceConfig: ServiceConfig | null, + serviceConfigError: StatusObject | null + ) => { + // Only handle the first resolution result + listener.onSuccessfulResolution = () => {}; + assert( + addressList.some( + addr => + isTcpSubchannelAddress(addr) && + addr.host === '127.0.0.1' && + addr.port === 50051 + ) + ); + assert( + addressList.some( + addr => + isTcpSubchannelAddress(addr) && + addr.host === '127.0.0.1' && + addr.port === 50052 + ) + ); + done(); + }, + onError: (error: StatusObject) => { + done(new Error(`Failed with status ${error.details}`)); + }, + }; + const resolver = resolverManager.createResolver(target, listener, {}); + resolver.updateResolution(); + }); + it('should handle one IPv6 address with no port', done => { + const target = resolverManager.mapUriDefaultScheme(parseUri('ipv6:::1')!)!; + const listener: resolverManager.ResolverListener = { + onSuccessfulResolution: ( + addressList: SubchannelAddress[], + serviceConfig: ServiceConfig | null, + serviceConfigError: StatusObject | null + ) => { + // Only handle the first resolution result + listener.onSuccessfulResolution = () => {}; + assert( + addressList.some( + addr => + isTcpSubchannelAddress(addr) && + addr.host === '::1' && + addr.port === 443 + ) + ); + done(); + }, + onError: (error: StatusObject) => { + done(new Error(`Failed with status ${error.details}`)); + }, + }; + const resolver = resolverManager.createResolver(target, listener, {}); + resolver.updateResolution(); + }); + it('should handle one IPv6 address with a port', done => { + const target = resolverManager.mapUriDefaultScheme(parseUri('ipv6:[::1]:50051')!)!; + const listener: resolverManager.ResolverListener = { + onSuccessfulResolution: ( + addressList: SubchannelAddress[], + serviceConfig: ServiceConfig | null, + serviceConfigError: StatusObject | null + ) => { + // Only handle the first resolution result + listener.onSuccessfulResolution = () => {}; + assert( + addressList.some( + addr => + isTcpSubchannelAddress(addr) && + addr.host === '::1' && + addr.port === 50051 + ) + ); + done(); + }, + onError: (error: StatusObject) => { + done(new Error(`Failed with status ${error.details}`)); + }, + }; + const resolver = resolverManager.createResolver(target, listener, {}); + resolver.updateResolution(); + }); + it('should handle multiple IPv6 addresses with different ports', done => { + const target = resolverManager.mapUriDefaultScheme(parseUri('ipv6:[::1]:50051,[::1]:50052')!)!; + const listener: resolverManager.ResolverListener = { + onSuccessfulResolution: ( + addressList: SubchannelAddress[], + serviceConfig: ServiceConfig | null, + serviceConfigError: StatusObject | null + ) => { + // Only handle the first resolution result + listener.onSuccessfulResolution = () => {}; + assert( + addressList.some( + addr => + isTcpSubchannelAddress(addr) && + addr.host === '::1' && + addr.port === 50051 + ) + ); + assert( + addressList.some( + addr => + isTcpSubchannelAddress(addr) && + addr.host === '::1' && + addr.port === 50052 + ) + ); + done(); + }, + onError: (error: StatusObject) => { + done(new Error(`Failed with status ${error.details}`)); + }, + }; + const resolver = resolverManager.createResolver(target, listener, {}); + resolver.updateResolution(); + }); + }); describe('getDefaultAuthority', () => { class OtherResolver implements resolverManager.Resolver { updateResolution() {