lint and formatting fixes

This commit is contained in:
murgatroid99 2019-11-14 17:26:40 -08:00
parent b15692fa2a
commit 4285761157
6 changed files with 308 additions and 125 deletions

View File

@ -65,6 +65,7 @@ export interface MetadataListener {
}
export interface MessageListener {
// tslint:disable-next-line no-any
(message: any, next: (message: any) => void): void;
}
@ -85,29 +86,39 @@ export type Listener = Partial<FullListener>;
*/
export interface InterceptingListener {
onReceiveMetadata(metadata: Metadata): void;
// tslint:disable-next-line no-any
onReceiveMessage(message: any): void;
onReceiveStatus(status: StatusObject): void;
}
export function isInterceptingListener(listener: Listener | InterceptingListener): listener is InterceptingListener {
return listener.onReceiveMetadata !== undefined && listener.onReceiveMetadata.length === 1;
export function isInterceptingListener(
listener: Listener | InterceptingListener
): listener is InterceptingListener {
return (
listener.onReceiveMetadata !== undefined &&
listener.onReceiveMetadata.length === 1
);
}
export class InterceptingListenerImpl implements InterceptingListener {
private processingMessage = false;
private pendingStatus: StatusObject | null = null;
constructor(private listener: FullListener, private nextListener: InterceptingListener) {}
constructor(
private listener: FullListener,
private nextListener: InterceptingListener
) {}
onReceiveMetadata(metadata: Metadata): void {
this.listener.onReceiveMetadata(metadata, (metadata) => {
this.listener.onReceiveMetadata(metadata, metadata => {
this.nextListener.onReceiveMetadata(metadata);
});
}
// tslint:disable-next-line no-any
onReceiveMessage(message: any): void {
/* If this listener processes messages asynchronously, the last message may
* be reordered with respect to the status */
this.processingMessage = true;
this.listener.onReceiveMessage(message, (msg) => {
this.listener.onReceiveMessage(message, msg => {
this.processingMessage = false;
this.nextListener.onReceiveMessage(msg);
if (this.pendingStatus) {
@ -116,7 +127,7 @@ export class InterceptingListenerImpl implements InterceptingListener {
});
}
onReceiveStatus(status: StatusObject): void {
this.listener.onReceiveStatus(status, (processedStatus) => {
this.listener.onReceiveStatus(status, processedStatus => {
if (this.processingMessage) {
this.pendingStatus = processedStatus;
} else {
@ -139,7 +150,7 @@ export interface Call {
cancelWithStatus(status: Status, details: string): void;
getPeer(): string;
start(metadata: Metadata, listener: InterceptingListener): void;
sendMessageWithContext(context: MessageContext, message: any): void;
sendMessageWithContext(context: MessageContext, message: Buffer): void;
startRead(): void;
halfClose(): void;
@ -235,7 +246,13 @@ export class Http2CallStream implements Call {
/* The combination check of readsClosed and that the two message buffer
* arrays are empty checks that there all incoming data has been fully
* processed */
if (this.finalStatus.code !== Status.OK || (this.readsClosed && this.unpushedReadMessages.length === 0 && this.unfilteredReadMessages.length === 0 && !this.isReadFilterPending)) {
if (
this.finalStatus.code !== Status.OK ||
(this.readsClosed &&
this.unpushedReadMessages.length === 0 &&
this.unfilteredReadMessages.length === 0 &&
!this.isReadFilterPending)
) {
this.outputStatus();
}
}
@ -259,7 +276,7 @@ export class Http2CallStream implements Call {
}
this.isReadFilterPending = false;
if (this.canPush) {
this.push(message)
this.push(message);
this.canPush = false;
this.http2Stream!.pause();
} else {
@ -530,15 +547,19 @@ export class Http2CallStream implements Call {
}
private maybeCloseWrites() {
if (this.writesClosed && !this.isWriteFilterPending && this.http2Stream !== null) {
if (
this.writesClosed &&
!this.isWriteFilterPending &&
this.http2Stream !== null
) {
this.http2Stream.end();
}
}
sendMessageWithContext(context: MessageContext, message: Buffer) {
const writeObj: WriteObject = {
message: message,
flags: context.flags
message,
flags: context.flags,
};
const cb: WriteCallback = context.callback ?? (() => {});
this.isWriteFilterPending = true;
@ -558,4 +579,4 @@ export class Http2CallStream implements Call {
this.writesClosed = true;
this.maybeCloseWrites();
}
}
}

View File

@ -136,8 +136,8 @@ export class ClientWritableStreamImpl<RequestType> extends Writable
_write(chunk: RequestType, encoding: string, cb: WriteCallback) {
const context: MessageContext = {
callback: cb
}
callback: cb,
};
const flags: number = Number(encoding);
if (!Number.isNaN(flags)) {
context.flags = flags;
@ -175,8 +175,8 @@ export class ClientDuplexStreamImpl<RequestType, ResponseType> extends Duplex
_write(chunk: RequestType, encoding: string, cb: WriteCallback) {
const context: MessageContext = {
callback: cb
}
callback: cb,
};
const flags: number = Number(encoding);
if (!Number.isNaN(flags)) {
context.flags = flags;

View File

@ -16,7 +16,24 @@
*/
import { Metadata } from './metadata';
import { StatusObject, CallStreamOptions, Listener, MetadataListener, MessageListener, StatusListener, FullListener, InterceptingListener, WriteObject, WriteCallback, InterceptingListenerImpl, isInterceptingListener, MessageContext, Http2CallStream, Deadline, Call } from './call-stream';
import {
StatusObject,
CallStreamOptions,
Listener,
MetadataListener,
MessageListener,
StatusListener,
FullListener,
InterceptingListener,
WriteObject,
WriteCallback,
InterceptingListenerImpl,
isInterceptingListener,
MessageContext,
Http2CallStream,
Deadline,
Call,
} from './call-stream';
import { Status } from './constants';
import { Channel } from './channel';
import { CallOptions } from './client';
@ -36,10 +53,18 @@ export class InterceptorConfigurationError extends Error {
}
export interface MetadataRequester {
(metadata: Metadata, listener: InterceptingListener, next: (metadata: Metadata, listener: InterceptingListener | Listener) => void): void;
(
metadata: Metadata,
listener: InterceptingListener,
next: (
metadata: Metadata,
listener: InterceptingListener | Listener
) => void
): void;
}
export interface MessageRequester {
// tslint:disable-next-line no-any
(message: any, next: (message: any) => void): void;
}
@ -87,8 +112,8 @@ export class ListenerBuilder {
return {
onReceiveMetadata: this.metadata,
onReceiveMessage: this.message,
onReceiveStatus: this.status
}
onReceiveStatus: this.status,
};
}
}
@ -123,7 +148,7 @@ export class RequesterBuilder {
start: this.start,
sendMessage: this.message,
halfClose: this.halfClose,
cancel: this.cancel
cancel: this.cancel,
};
}
}
@ -141,7 +166,7 @@ const defaultListener: FullListener = {
},
onReceiveStatus: (status, next) => {
next(status);
}
},
};
/**
@ -155,15 +180,16 @@ const defaultRequester: FullRequester = {
sendMessage: (message, next) => {
next(message);
},
halfClose: (next) => {
halfClose: next => {
next();
},
cancel: (next) => {
cancel: next => {
next();
}
}
},
};
export interface InterceptorOptions extends CallOptions {
// tslint:disable-next-line no-any
method_definition: ClientMethodDefinition<any, any>;
}
@ -171,7 +197,9 @@ export interface InterceptingCallInterface {
cancelWithStatus(status: Status, details: string): void;
getPeer(): string;
start(metadata: Metadata, listener?: Partial<InterceptingListener>): void;
// tslint:disable-next-line no-any
sendMessageWithContext(context: MessageContext, message: any): void;
// tslint:disable-next-line no-any
sendMessage(message: any): void;
startRead(): void;
halfClose(): void;
@ -194,14 +222,17 @@ export class InterceptingCall implements InterceptingCallInterface {
* a message was still being processed.
*/
private pendingHalfClose = false;
constructor(private nextCall: InterceptingCallInterface, requester?: Requester) {
constructor(
private nextCall: InterceptingCallInterface,
requester?: Requester
) {
if (requester) {
this.requester = {
start: requester.start ?? defaultRequester.start,
sendMessage: requester.sendMessage ?? defaultRequester.sendMessage,
halfClose: requester.halfClose ?? defaultRequester.halfClose,
cancel: requester.cancel ?? defaultRequester.cancel
}
cancel: requester.cancel ?? defaultRequester.cancel,
};
} else {
this.requester = defaultRequester;
}
@ -216,30 +247,46 @@ export class InterceptingCall implements InterceptingCallInterface {
getPeer() {
return this.nextCall.getPeer();
}
start(metadata: Metadata, interceptingListener?: Partial<InterceptingListener>): void {
start(
metadata: Metadata,
interceptingListener?: Partial<InterceptingListener>
): void {
const fullInterceptingListener: InterceptingListener = {
onReceiveMetadata: interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ?? (metadata => {}),
onReceiveMessage: interceptingListener?.onReceiveMessage?.bind(interceptingListener) ?? (message => {}),
onReceiveStatus: interceptingListener?.onReceiveStatus?.bind(interceptingListener) ?? (status => {})
}
onReceiveMetadata:
interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ??
(metadata => {}),
onReceiveMessage:
interceptingListener?.onReceiveMessage?.bind(interceptingListener) ??
(message => {}),
onReceiveStatus:
interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
(status => {}),
};
this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
let finalInterceptingListener: InterceptingListener;
if (isInterceptingListener(listener)) {
finalInterceptingListener = listener;
} else {
const fullListener: FullListener = {
onReceiveMetadata: listener.onReceiveMetadata ?? defaultListener.onReceiveMetadata,
onReceiveMessage: listener.onReceiveMessage ?? defaultListener.onReceiveMessage,
onReceiveStatus: listener.onReceiveStatus ?? defaultListener.onReceiveStatus
onReceiveMetadata:
listener.onReceiveMetadata ?? defaultListener.onReceiveMetadata,
onReceiveMessage:
listener.onReceiveMessage ?? defaultListener.onReceiveMessage,
onReceiveStatus:
listener.onReceiveStatus ?? defaultListener.onReceiveStatus,
};
finalInterceptingListener = new InterceptingListenerImpl(fullListener, fullInterceptingListener);
finalInterceptingListener = new InterceptingListenerImpl(
fullListener,
fullInterceptingListener
);
}
this.nextCall.start(md, finalInterceptingListener);
});
}
// tslint:disable-next-line no-any
sendMessageWithContext(context: MessageContext, message: any): void {
this.processingMessage = true;
this.requester.sendMessage(message, (finalMessage) => {
this.requester.sendMessage(message, finalMessage => {
this.processingMessage = false;
this.nextCall.sendMessageWithContext(context, finalMessage);
if (this.pendingHalfClose) {
@ -247,6 +294,7 @@ export class InterceptingCall implements InterceptingCallInterface {
}
});
}
// tslint:disable-next-line no-any
sendMessage(message: any): void {
this.sendMessageWithContext({}, message);
}
@ -268,23 +316,22 @@ export class InterceptingCall implements InterceptingCallInterface {
}
function getCall(channel: Channel, path: string, options: CallOptions): Call {
var deadline;
var host;
var parent;
var propagate_flags;
var credentials;
let deadline;
let host;
const parent = null;
let propagateFlags;
let credentials;
if (options) {
deadline = options.deadline;
host = options.host;
propagate_flags = options.propagate_flags;
propagateFlags = options.propagate_flags;
credentials = options.credentials;
}
if (deadline === undefined) {
deadline = Infinity;
}
var call = channel.createCall(path, deadline, host,
parent, propagate_flags);
const call = channel.createCall(path, deadline, host, parent, propagateFlags);
if (credentials) {
call.setCredentials(credentials);
}
@ -296,7 +343,11 @@ function getCall(channel: Channel, path: string, options: CallOptions): Call {
* object and handles serialization and deseraizliation.
*/
class BaseInterceptingCall implements InterceptingCallInterface {
constructor(protected call: Call, protected methodDefinition: ClientMethodDefinition<any, any>) {}
// tslint:disable-next-line no-any
constructor(
protected call: Call,
protected methodDefinition: ClientMethodDefinition<any, any>
) {}
cancelWithStatus(status: Status, details: string): void {
this.call.cancelWithStatus(status, details);
}
@ -306,6 +357,7 @@ class BaseInterceptingCall implements InterceptingCallInterface {
setCredentials(credentials: CallCredentials): void {
this.call.setCredentials(credentials);
}
// tslint:disable-next-line no-any
sendMessageWithContext(context: MessageContext, message: any): void {
let serialized: Buffer;
try {
@ -315,32 +367,41 @@ class BaseInterceptingCall implements InterceptingCallInterface {
this.call.cancelWithStatus(Status.INTERNAL, 'Serialization failure');
}
}
// tslint:disable-next-line no-any
sendMessage(message: any) {
this.sendMessageWithContext({}, message);
}
start(metadata: Metadata, interceptingListener?: Partial<InterceptingListener>): void {
start(
metadata: Metadata,
interceptingListener?: Partial<InterceptingListener>
): void {
let readError: StatusObject | null = null;
this.call.start(metadata, {
onReceiveMetadata: (metadata) => {
onReceiveMetadata: metadata => {
interceptingListener?.onReceiveMetadata?.(metadata);
},
onReceiveMessage: (message) => {
onReceiveMessage: message => {
// tslint:disable-next-line no-any
let deserialized: any;
try {
deserialized = this.methodDefinition.responseDeserialize(message);
interceptingListener?.onReceiveMessage?.(deserialized);
} catch (e) {
readError = {code: Status.INTERNAL, details: 'Failed to parse server response', metadata: new Metadata()};
readError = {
code: Status.INTERNAL,
details: 'Failed to parse server response',
metadata: new Metadata(),
};
this.call.cancelWithStatus(readError.code, readError.details);
}
},
onReceiveStatus: (status) => {
onReceiveStatus: status => {
if (readError) {
interceptingListener?.onReceiveStatus?.(readError);
} else {
interceptingListener?.onReceiveStatus?.(status);
}
}
},
});
}
startRead() {
@ -355,14 +416,18 @@ class BaseInterceptingCall implements InterceptingCallInterface {
* BaseInterceptingCall with special-cased behavior for methods with unary
* responses.
*/
class BaseUnaryInterceptingCall extends BaseInterceptingCall implements InterceptingCallInterface {
class BaseUnaryInterceptingCall extends BaseInterceptingCall
implements InterceptingCallInterface {
// tslint:disable-next-line no-any
constructor(call: Call, methodDefinition: ClientMethodDefinition<any, any>) {
super(call, methodDefinition);
}
start(metadata: Metadata, listener?: Partial<InterceptingListener>): void {
let receivedMessage = false;
const wrapperListener: InterceptingListener = {
onReceiveMetadata: listener?.onReceiveMetadata?.bind(listener) ?? (metadata => {}),
onReceiveMetadata:
listener?.onReceiveMetadata?.bind(listener) ?? (metadata => {}),
// tslint:disable-next-line no-any
onReceiveMessage: (message: any) => {
receivedMessage = true;
listener?.onReceiveMessage?.(message);
@ -372,8 +437,8 @@ class BaseUnaryInterceptingCall extends BaseInterceptingCall implements Intercep
listener?.onReceiveMessage?.(null);
}
listener?.onReceiveStatus?.(status);
}
}
},
};
super.start(metadata, wrapperListener);
this.call.startRead();
}
@ -383,9 +448,16 @@ class BaseUnaryInterceptingCall extends BaseInterceptingCall implements Intercep
* BaseInterceptingCall with special-cased behavior for methods with streaming
* responses.
*/
class BaseStreamingInterceptingCall extends BaseInterceptingCall implements InterceptingCallInterface { }
class BaseStreamingInterceptingCall extends BaseInterceptingCall
implements InterceptingCallInterface {}
function getBottomInterceptingCall(channel: Channel, path: string, options: InterceptorOptions, methodDefinition: ClientMethodDefinition<any, any>) {
// tslint:disable-next-line no-any
function getBottomInterceptingCall(
channel: Channel,
path: string,
options: InterceptorOptions,
methodDefinition: ClientMethodDefinition<any, any>
) {
const call = getCall(channel, path, options);
if (methodDefinition.responseStream) {
return new BaseStreamingInterceptingCall(call, methodDefinition);
@ -399,49 +471,75 @@ export interface NextCall {
}
export interface Interceptor {
(options: InterceptorOptions, nextCall: NextCall): InterceptingCall
(options: InterceptorOptions, nextCall: NextCall): InterceptingCall;
}
export interface InterceptorProvider {
// tslint:disable-next-line no-any
(methodDefinition: ClientMethodDefinition<any, any>): Interceptor;
}
export interface InterceptorArguments {
clientInterceptors: Interceptor[],
clientInterceptorProviders: InterceptorProvider[],
callInterceptors: Interceptor[],
callInterceptorProviders: InterceptorProvider[]
clientInterceptors: Interceptor[];
clientInterceptorProviders: InterceptorProvider[];
callInterceptors: Interceptor[];
callInterceptorProviders: InterceptorProvider[];
}
export function getInterceptingCall(interceptorArgs: InterceptorArguments, methodDefinition: ClientMethodDefinition<any, any>, options: CallOptions, channel: Channel): InterceptingCallInterface {
if (interceptorArgs.clientInterceptors.length > 0 && interceptorArgs.clientInterceptorProviders.length > 0) {
// tslint:disable-next-line no-any
export function getInterceptingCall(
interceptorArgs: InterceptorArguments,
methodDefinition: ClientMethodDefinition<any, any>,
options: CallOptions,
channel: Channel
): InterceptingCallInterface {
if (
interceptorArgs.clientInterceptors.length > 0 &&
interceptorArgs.clientInterceptorProviders.length > 0
) {
throw new InterceptorConfigurationError(
'Both interceptors and interceptor_providers were passed as options ' +
'to the client constructor. Only one of these is allowed.'
'to the client constructor. Only one of these is allowed.'
);
}
if (interceptorArgs.callInterceptors.length > 0 && interceptorArgs.callInterceptorProviders.length > 0) {
if (
interceptorArgs.callInterceptors.length > 0 &&
interceptorArgs.callInterceptorProviders.length > 0
) {
throw new InterceptorConfigurationError(
'Both interceptors and interceptor_providers were passed as call ' +
'options. Only one of these is allowed.'
'options. Only one of these is allowed.'
);
}
let interceptors: Interceptor[] = [];
// Interceptors passed to the call override interceptors passed to the client constructor
if (interceptorArgs.callInterceptors.length > 0 || interceptorArgs.callInterceptorProviders.length > 0) {
interceptors = ([] as Interceptor[]).concat(
interceptorArgs.callInterceptors,
interceptorArgs.callInterceptorProviders.map(provider => provider(methodDefinition))
).filter(interceptor => interceptor);
if (
interceptorArgs.callInterceptors.length > 0 ||
interceptorArgs.callInterceptorProviders.length > 0
) {
interceptors = ([] as Interceptor[])
.concat(
interceptorArgs.callInterceptors,
interceptorArgs.callInterceptorProviders.map(provider =>
provider(methodDefinition)
)
)
.filter(interceptor => interceptor);
// Filter out falsy values when providers return nothing
} else {
interceptors = ([] as Interceptor[]).concat(
interceptorArgs.clientInterceptors,
interceptorArgs.clientInterceptorProviders.map(provider => provider(methodDefinition))
).filter(interceptor => interceptor);
interceptors = ([] as Interceptor[])
.concat(
interceptorArgs.clientInterceptors,
interceptorArgs.clientInterceptorProviders.map(provider =>
provider(methodDefinition)
)
)
.filter(interceptor => interceptor);
// Filter out falsy values when providers return nothing
}
const interceptorOptions = Object.assign({}, options, {method_definition: methodDefinition});
const interceptorOptions = Object.assign({}, options, {
method_definition: methodDefinition,
});
/* For each interceptor in the list, the nextCall function passed to it is
* based on the next interceptor in the list, using a nextCall function
* constructed with the following interceptor in the list, and so on. The
@ -449,8 +547,17 @@ export function getInterceptingCall(interceptorArgs: InterceptorArguments, metho
* function that invokes getBottomInterceptingCall, the result of which
* handles (de)serialization and also gets the underlying call from the
* channel. */
const getCall: NextCall = interceptors.reduceRight<NextCall>((nextCall: NextCall, nextInterceptor: Interceptor) => {
return currentOptions => nextInterceptor(currentOptions, nextCall);
}, (finalOptions: InterceptorOptions) => getBottomInterceptingCall(channel, methodDefinition.path, finalOptions, methodDefinition));
const getCall: NextCall = interceptors.reduceRight<NextCall>(
(nextCall: NextCall, nextInterceptor: Interceptor) => {
return currentOptions => nextInterceptor(currentOptions, nextCall);
},
(finalOptions: InterceptorOptions) =>
getBottomInterceptingCall(
channel,
methodDefinition.path,
finalOptions,
methodDefinition
)
);
return getCall(interceptorOptions);
}
}

View File

@ -29,14 +29,25 @@ import {
SurfaceCall,
} from './call';
import { CallCredentials } from './call-credentials';
import { Deadline, StatusObject, WriteObject, InterceptingListener } from './call-stream';
import {
Deadline,
StatusObject,
WriteObject,
InterceptingListener,
} from './call-stream';
import { Channel, ConnectivityState, ChannelImplementation } from './channel';
import { ChannelCredentials } from './channel-credentials';
import { ChannelOptions } from './channel-options';
import { Status } from './constants';
import { Metadata } from './metadata';
import { ClientMethodDefinition } from './make-client';
import { getInterceptingCall, Interceptor, InterceptorProvider, InterceptorArguments, InterceptingCallInterface } from './client-interceptors';
import {
getInterceptingCall,
Interceptor,
InterceptorProvider,
InterceptorArguments,
InterceptingCallInterface,
} from './client-interceptors';
const CHANNEL_SYMBOL = Symbol();
const INTERCEPTOR_SYMBOL = Symbol();
@ -53,8 +64,8 @@ export interface CallOptions {
* but the server is not yet implemented so it makes no sense to have it */
propagate_flags?: number;
credentials?: CallCredentials;
interceptors?: Interceptor[],
interceptor_providers?: InterceptorProvider[]
interceptors?: Interceptor[];
interceptor_providers?: InterceptorProvider[];
}
export type ClientOptions = Partial<ChannelOptions> & {
@ -64,8 +75,8 @@ export type ClientOptions = Partial<ChannelOptions> & {
credentials: ChannelCredentials,
options: ClientOptions
) => Channel;
interceptors?: Interceptor[],
interceptor_providers?: InterceptorProvider[]
interceptors?: Interceptor[];
interceptor_providers?: InterceptorProvider[];
};
/**
@ -98,10 +109,14 @@ export class Client {
}
this[INTERCEPTOR_SYMBOL] = options.interceptors ?? [];
this[INTERCEPTOR_PROVIDER_SYMBOL] = options.interceptor_providers ?? [];
if (this[INTERCEPTOR_SYMBOL].length > 0 && this[INTERCEPTOR_PROVIDER_SYMBOL].length > 0) {
if (
this[INTERCEPTOR_SYMBOL].length > 0 &&
this[INTERCEPTOR_PROVIDER_SYMBOL].length > 0
) {
throw new Error(
'Both interceptors and interceptor_providers were passed as options ' +
'to the client constructor. Only one of these is allowed.');
'to the client constructor. Only one of these is allowed.'
);
}
}
@ -218,20 +233,28 @@ export class Client {
({ metadata, options, callback } = this.checkOptionalUnaryResponseArguments<
ResponseType
>(metadata, options, callback));
const methodDefinition: ClientMethodDefinition<RequestType, ResponseType> = {
const methodDefinition: ClientMethodDefinition<
RequestType,
ResponseType
> = {
path: method,
requestStream: false,
responseStream: false,
requestSerialize: serialize,
responseDeserialize: deserialize
responseDeserialize: deserialize,
};
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: options.interceptors ?? [],
callInterceptorProviders: options.interceptor_providers ?? []
callInterceptorProviders: options.interceptor_providers ?? [],
};
const call: InterceptingCallInterface = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]);
const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs,
methodDefinition,
options,
this[CHANNEL_SYMBOL]
);
if (options.credentials) {
call.setCredentials(options.credentials);
}
@ -239,9 +262,10 @@ export class Client {
let responseMessage: ResponseType | null = null;
let receivedStatus = false;
call.start(metadata, {
onReceiveMetadata: (metadata) => {
onReceiveMetadata: metadata => {
emitter.emit('metadata', metadata);
},
// tslint:disable-next-line no-any
onReceiveMessage(message: any) {
if (responseMessage != null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
@ -259,7 +283,7 @@ export class Client {
callback!(callErrorFromStatus(status));
}
emitter.emit('status', status);
}
},
});
call.sendMessage(argument);
call.halfClose();
@ -305,20 +329,28 @@ export class Client {
({ metadata, options, callback } = this.checkOptionalUnaryResponseArguments<
ResponseType
>(metadata, options, callback));
const methodDefinition: ClientMethodDefinition<RequestType, ResponseType> = {
const methodDefinition: ClientMethodDefinition<
RequestType,
ResponseType
> = {
path: method,
requestStream: true,
responseStream: false,
requestSerialize: serialize,
responseDeserialize: deserialize
responseDeserialize: deserialize,
};
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: options.interceptors ?? [],
callInterceptorProviders: options.interceptor_providers ?? []
callInterceptorProviders: options.interceptor_providers ?? [],
};
const call: InterceptingCallInterface = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]);
const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs,
methodDefinition,
options,
this[CHANNEL_SYMBOL]
);
if (options.credentials) {
call.setCredentials(options.credentials);
}
@ -326,9 +358,10 @@ export class Client {
let responseMessage: ResponseType | null = null;
let receivedStatus = false;
call.start(metadata, {
onReceiveMetadata: (metadata) => {
onReceiveMetadata: metadata => {
emitter.emit('metadata', metadata);
},
// tslint:disable-next-line no-any
onReceiveMessage(message: any) {
if (responseMessage != null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
@ -346,7 +379,7 @@ export class Client {
callback!(callErrorFromStatus(status));
}
emitter.emit('status', status);
}
},
});
return emitter;
}
@ -399,29 +432,41 @@ export class Client {
options?: CallOptions
): ClientReadableStream<ResponseType> {
({ metadata, options } = this.checkMetadataAndOptions(metadata, options));
const methodDefinition: ClientMethodDefinition<RequestType, ResponseType> = {
const methodDefinition: ClientMethodDefinition<
RequestType,
ResponseType
> = {
path: method,
requestStream: false,
responseStream: true,
requestSerialize: serialize,
responseDeserialize: deserialize
responseDeserialize: deserialize,
};
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: options.interceptors ?? [],
callInterceptorProviders: options.interceptor_providers ?? []
callInterceptorProviders: options.interceptor_providers ?? [],
};
const call: InterceptingCallInterface = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]);
const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs,
methodDefinition,
options,
this[CHANNEL_SYMBOL]
);
if (options.credentials) {
call.setCredentials(options.credentials);
}
const stream = new ClientReadableStreamImpl<ResponseType>(call, deserialize);
const stream = new ClientReadableStreamImpl<ResponseType>(
call,
deserialize
);
let receivedStatus = false;
call.start(metadata, {
onReceiveMetadata(metadata: Metadata) {
stream.emit('metadata', metadata);
},
// tslint:disable-next-line no-any
onReceiveMessage(message: any) {
if (stream.push(message)) {
call.startRead();
@ -437,7 +482,7 @@ export class Client {
stream.emit('error', callErrorFromStatus(status));
}
stream.emit('status', status);
}
},
});
call.sendMessage(argument);
call.halfClose();
@ -465,20 +510,28 @@ export class Client {
options?: CallOptions
): ClientDuplexStream<RequestType, ResponseType> {
({ metadata, options } = this.checkMetadataAndOptions(metadata, options));
const methodDefinition: ClientMethodDefinition<RequestType, ResponseType> = {
const methodDefinition: ClientMethodDefinition<
RequestType,
ResponseType
> = {
path: method,
requestStream: true,
responseStream: true,
requestSerialize: serialize,
responseDeserialize: deserialize
responseDeserialize: deserialize,
};
const interceptorArgs: InterceptorArguments = {
clientInterceptors: this[INTERCEPTOR_SYMBOL],
clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
callInterceptors: options.interceptors ?? [],
callInterceptorProviders: options.interceptor_providers ?? []
callInterceptorProviders: options.interceptor_providers ?? [],
};
const call: InterceptingCallInterface = getInterceptingCall(interceptorArgs, methodDefinition, options, this[CHANNEL_SYMBOL]);
const call: InterceptingCallInterface = getInterceptingCall(
interceptorArgs,
methodDefinition,
options,
this[CHANNEL_SYMBOL]
);
if (options.credentials) {
call.setCredentials(options.credentials);
}
@ -507,7 +560,7 @@ export class Client {
stream.emit('error', callErrorFromStatus(status));
}
stream.emit('status', status);
}
},
});
return stream;
}

View File

@ -284,14 +284,15 @@ export { StatusBuilder };
export { Listener } from './call-stream';
export {
Requester,
ListenerBuilder,
RequesterBuilder,
Interceptor,
InterceptorProvider,
export {
Requester,
ListenerBuilder,
RequesterBuilder,
Interceptor,
InterceptorProvider,
InterceptingCall,
InterceptorConfigurationError } from './client-interceptors';
InterceptorConfigurationError,
} from './client-interceptors';
export { GrpcObject } from './make-client';

View File

@ -45,8 +45,9 @@ export interface ServerMethodDefinition<RequestType, ResponseType> {
originalName?: string;
}
export interface MethodDefinition<RequestType, ResponseType> extends ClientMethodDefinition<RequestType, ResponseType>, ServerMethodDefinition<RequestType, ResponseType> {
}
export interface MethodDefinition<RequestType, ResponseType>
extends ClientMethodDefinition<RequestType, ResponseType>,
ServerMethodDefinition<RequestType, ResponseType> {}
export interface ServiceDefinition {
// tslint:disable-next-line no-any