diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index 8fcc7065..3a81cbe9 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -70,6 +70,17 @@ function getSystemErrorName(errno: number): string { export type Deadline = Date | number; +function getMinDeadline(deadlineList: Deadline[]): Deadline { + let minValue: number = Infinity; + for (const deadline of deadlineList) { + const deadlineMsecs = deadline instanceof Date ? deadline.getTime() : deadline; + if (deadlineMsecs < minValue) { + minValue = deadlineMsecs; + } + } + return minValue; +} + export interface CallStreamOptions { deadline: Deadline; flags: number; @@ -235,6 +246,8 @@ export class Http2CallStream implements Call { private internalError: SystemError | null = null; + private configDeadline: Deadline = Infinity; + constructor( private readonly methodName: string, private readonly channel: ChannelImplementation, @@ -675,15 +688,14 @@ export class Http2CallStream implements Call { } getDeadline(): Deadline { + const deadlineList = [this.options.deadline]; if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) { - const parentDeadline = this.options.parentCall.getDeadline(); - const selfDeadline = this.options.deadline; - const parentDeadlineMsecs = parentDeadline instanceof Date ? parentDeadline.getTime() : parentDeadline; - const selfDeadlineMsecs = selfDeadline instanceof Date ? selfDeadline.getTime() : selfDeadline; - return Math.min(parentDeadlineMsecs, selfDeadlineMsecs); - } else { - return this.options.deadline; + deadlineList.push(this.options.parentCall.getDeadline()); } + if (this.configDeadline) { + deadlineList.push(this.configDeadline); + } + return getMinDeadline(deadlineList); } getCredentials(): CallCredentials { @@ -710,6 +722,10 @@ export class Http2CallStream implements Call { return this.options.host; } + setConfigDeadline(configDeadline: Deadline) { + this.configDeadline = configDeadline; + } + startRead() { /* If the stream has ended with an error, we should not emit any more * messages and we should communicate that the stream has ended */ diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 41715c41..8b2658f3 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -509,6 +509,11 @@ export class ChannelImplementation implements Channel { } private tryGetConfig(stream: Http2CallStream, metadata: Metadata) { + if (stream.getStatus() !== null) { + /* If the stream has a status, it has already finished and we don't need + * to take any more actions on it. */ + return; + } if (this.configSelector === null) { /* This branch will only be taken at the beginning of the channel's life, * before the resolver ever returns a result. So, the @@ -523,6 +528,14 @@ export class ChannelImplementation implements Channel { } else { const callConfig = this.configSelector(stream.getMethod(), metadata); if (callConfig.status === Status.OK) { + if (callConfig.methodConfig.timeout) { + const deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + callConfig.methodConfig.timeout.seconds); + deadline.setMilliseconds(deadline.getMilliseconds() + callConfig.methodConfig.timeout.nanos / 1_000_000); + stream.setConfigDeadline(deadline); + // Refreshing the filters makes the deadline filter pick up the new deadline + stream.filterStack.refresh(); + } this.tryPick(stream, metadata, callConfig); } else { stream.cancelWithStatus(callConfig.status, "Failed to route call to method " + stream.getMethod()); diff --git a/packages/grpc-js/src/deadline-filter.ts b/packages/grpc-js/src/deadline-filter.ts index 99bfa2be..afc01554 100644 --- a/packages/grpc-js/src/deadline-filter.ts +++ b/packages/grpc-js/src/deadline-filter.ts @@ -42,30 +42,41 @@ function getDeadline(deadline: number) { export class DeadlineFilter extends BaseFilter implements Filter { private timer: NodeJS.Timer | null = null; - private deadline: number; + private deadline: number = Infinity; constructor( private readonly channel: Channel, private readonly callStream: Call ) { super(); - const callDeadline = callStream.getDeadline(); + this.retreiveDeadline(); + this.runTimer(); + } + + private retreiveDeadline() { + const callDeadline = this.callStream.getDeadline(); if (callDeadline instanceof Date) { this.deadline = callDeadline.getTime(); } else { this.deadline = callDeadline; } + } + + private runTimer() { + if (this.timer) { + clearTimeout(this.timer); + } const now: number = new Date().getTime(); let timeout = this.deadline - now; if (timeout <= 0) { process.nextTick(() => { - callStream.cancelWithStatus( + this.callStream.cancelWithStatus( Status.DEADLINE_EXCEEDED, 'Deadline exceeded' ); }); } else if (this.deadline !== Infinity) { this.timer = setTimeout(() => { - callStream.cancelWithStatus( + this.callStream.cancelWithStatus( Status.DEADLINE_EXCEEDED, 'Deadline exceeded' ); @@ -74,6 +85,11 @@ export class DeadlineFilter extends BaseFilter implements Filter { } } + refresh() { + this.retreiveDeadline(); + this.runTimer(); + } + async sendMetadata(metadata: Promise) { if (this.deadline === Infinity) { return metadata; diff --git a/packages/grpc-js/src/filter-stack.ts b/packages/grpc-js/src/filter-stack.ts index a656a409..4fd88854 100644 --- a/packages/grpc-js/src/filter-stack.ts +++ b/packages/grpc-js/src/filter-stack.ts @@ -71,6 +71,12 @@ export class FilterStack implements Filter { return result; } + + refresh(): void { + for (const filter of this.filters) { + filter.refresh(); + } + } } export class FilterStackFactory implements FilterFactory { diff --git a/packages/grpc-js/src/filter.ts b/packages/grpc-js/src/filter.ts index c1e412ae..eb67bd32 100644 --- a/packages/grpc-js/src/filter.ts +++ b/packages/grpc-js/src/filter.ts @@ -32,6 +32,8 @@ export interface Filter { receiveMessage(message: Promise): Promise; receiveTrailers(status: StatusObject): StatusObject; + + refresh(): void; } export abstract class BaseFilter implements Filter { @@ -54,6 +56,9 @@ export abstract class BaseFilter implements Filter { receiveTrailers(status: StatusObject): StatusObject { return status; } + + refresh(): void { + } } export interface FilterFactory { diff --git a/packages/grpc-js/src/service-config.ts b/packages/grpc-js/src/service-config.ts index ed225e08..efc09f9c 100644 --- a/packages/grpc-js/src/service-config.ts +++ b/packages/grpc-js/src/service-config.ts @@ -34,10 +34,15 @@ export interface MethodConfigName { method?: string; } +export interface Duration { + seconds: number; + nanos: number; +} + export interface MethodConfig { name: MethodConfigName[]; waitForReady?: boolean; - timeout?: string; + timeout?: Duration; maxRequestBytes?: number; maxResponseBytes?: number; } @@ -101,13 +106,25 @@ function validateMethodConfig(obj: any): MethodConfig { result.waitForReady = obj.waitForReady; } if ('timeout' in obj) { - if ( - !(typeof obj.timeout === 'string') || - !TIMEOUT_REGEX.test(obj.timeout) + if (typeof obj.timeout === 'object') { + if (!('seconds' in obj.timeout) || !(typeof obj.timeout.seconds === 'number')) { + throw new Error('Invalid method config: invalid timeout.seconds'); + } + if (!('nanos' in obj.timeout) || !(typeof obj.timeout.nanos === 'number')) { + throw new Error('Invalid method config: invalid timeout.nanos'); + } + result.timeout = obj.timeout; + } else if ( + (typeof obj.timeout === 'string') && TIMEOUT_REGEX.test(obj.timeout) ) { + const timeoutParts = obj.timeout.substring(0, obj.timeout.length - 1).split('.'); + result.timeout = { + seconds: timeoutParts[0] | 0, + nanos: (timeoutParts[1] ?? 0) | 0 + } + } else { throw new Error('Invalid method config: invalid timeout'); } - result.timeout = obj.timeout; } if ('maxRequestBytes' in obj) { if (typeof obj.maxRequestBytes !== 'number') {