diff --git a/packages/grpc-js-core/src/call-stream.ts b/packages/grpc-js-core/src/call-stream.ts index 5d5a5b28..877978a8 100644 --- a/packages/grpc-js-core/src/call-stream.ts +++ b/packages/grpc-js-core/src/call-stream.ts @@ -3,6 +3,7 @@ import {Duplex} from 'stream'; import {CallCredentials} from './call-credentials'; import {Status} from './constants'; +import {EmitterAugmentation1} from './events'; import {Filter} from './filter'; import {FilterStackFactory} from './filter-stack'; import {Metadata} from './metadata'; @@ -34,7 +35,7 @@ export interface WriteObject { /** * This interface represents a duplex stream associated with a single gRPC call. */ -export interface CallStream extends ObjectDuplex { +export type CallStream = { cancelWithStatus(status: Status, details: string): void; getPeer(): string; @@ -43,37 +44,9 @@ export interface CallStream extends ObjectDuplex { /* If the return value is null, the call has not ended yet. Otherwise, it has * ended with the specified status */ getStatus(): StatusObject|null; - - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; - - addListener(event: 'metadata', listener: (metadata: Metadata) => void): this; - emit(event: 'metadata', metadata: Metadata): boolean; - on(event: 'metadata', listener: (metadata: Metadata) => void): this; - once(event: 'metadata', listener: (metadata: Metadata) => void): this; - prependListener(event: 'metadata', listener: (metadata: Metadata) => void): - this; - prependOnceListener( - event: 'metadata', listener: (metadata: Metadata) => void): this; - removeListener(event: 'metadata', listener: (metadata: Metadata) => void): - this; - - addListener(event: 'status', listener: (status: StatusObject) => void): this; - emit(event: 'status', status: StatusObject): boolean; - on(event: 'status', listener: (status: StatusObject) => void): this; - once(event: 'status', listener: (status: StatusObject) => void): this; - prependListener(event: 'status', listener: (status: StatusObject) => void): - this; - prependOnceListener( - event: 'status', listener: (status: StatusObject) => void): this; - removeListener(event: 'status', listener: (status: StatusObject) => void): - this; -} +} & EmitterAugmentation1<'metadata', Metadata> + & EmitterAugmentation1<'status', StatusObject> + & ObjectDuplex; enum ReadState { NO_DATA, diff --git a/packages/grpc-js-core/src/call.ts b/packages/grpc-js-core/src/call.ts index a4ae4479..5abc0479 100644 --- a/packages/grpc-js-core/src/call.ts +++ b/packages/grpc-js-core/src/call.ts @@ -1,4 +1,5 @@ import {EventEmitter} from 'events'; +import {EmitterAugmentation1} from './events'; import {Duplex, Readable, Writable} from 'stream'; import {CallStream, StatusObject, WriteObject} from './call-stream'; @@ -16,44 +17,27 @@ export class ServiceErrorImpl extends Error implements ServiceError { metadata?: Metadata; } -export interface Call extends EventEmitter { +export type Call = { cancel(): void; getPeer(): string; +} & EmitterAugmentation1<'metadata', Metadata> + & EmitterAugmentation1<'status', StatusObject> + & EventEmitter; - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; +export type ClientUnaryCall = Call; - addListener(event: 'metadata', listener: (metadata: Metadata) => void): this; - emit(event: 'metadata', metadata: Metadata): boolean; - on(event: 'metadata', listener: (metadata: Metadata) => void): this; - once(event: 'metadata', listener: (metadata: Metadata) => void): this; - prependListener(event: 'metadata', listener: (metadata: Metadata) => void): - this; - prependOnceListener( - event: 'metadata', listener: (metadata: Metadata) => void): this; - removeListener(event: 'metadata', listener: (metadata: Metadata) => void): - this; +export type ClientReadableStream = { + deserialize: (chunk: Buffer) => ResponseType; +} & Call & ObjectReadable; - addListener(event: 'status', listener: (status: StatusObject) => void): this; - emit(event: 'status', status: StatusObject): boolean; - on(event: 'status', listener: (status: StatusObject) => void): this; - once(event: 'status', listener: (status: StatusObject) => void): this; - prependListener(event: 'status', listener: (status: StatusObject) => void): - this; - prependOnceListener( - event: 'status', listener: (status: StatusObject) => void): this; - removeListener(event: 'status', listener: (status: StatusObject) => void): - this; -} +export type ClientWritableStream = { + serialize: (value: RequestType) => Buffer; +} & Call & ObjectWritable; -export interface ClientUnaryCall extends Call {} +export type ClientDuplexStream = + ClientWritableStream & ClientReadableStream; -export class ClientUnaryCallImpl extends EventEmitter implements Call { +export class ClientUnaryCallImpl extends EventEmitter implements ClientUnaryCall { constructor(private readonly call: CallStream) { super(); call.on('metadata', (metadata: Metadata) => { @@ -73,43 +57,6 @@ export class ClientUnaryCallImpl extends EventEmitter implements Call { } } -export interface ClientReadableStream extends - Call, ObjectReadable { - deserialize: (chunk: Buffer) => ResponseType; - - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; -} - -export interface ClientWritableStream extends - Call, ObjectWritable { - serialize: (value: RequestType) => Buffer; - - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; -} - -export interface ClientDuplexStream extends - ClientWritableStream, ClientReadableStream { - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; -} - function setUpReadableStream( stream: ClientReadableStream, call: CallStream, deserialize: (chunk: Buffer) => ResponseType): void { diff --git a/packages/grpc-js-core/src/client.ts b/packages/grpc-js-core/src/client.ts index 13849ab7..862d212c 100644 --- a/packages/grpc-js-core/src/client.ts +++ b/packages/grpc-js-core/src/client.ts @@ -135,7 +135,6 @@ export class Client { method: string, serialize: (value: RequestType) => Buffer, deserialize: (value: Buffer) => ResponseType, argument: RequestType, callback: UnaryCallback): ClientUnaryCall; - makeUnaryRequest( method: string, serialize: (value: RequestType) => Buffer, deserialize: (value: Buffer) => ResponseType, argument: RequestType, @@ -147,14 +146,13 @@ export class Client { metadata, options, callback)); const call: CallStream = this.channel.createStream(method, metadata, options); - const emitter: ClientUnaryCall = new ClientUnaryCallImpl(call); const message: Buffer = serialize(argument); const writeObj: WriteObject = {message: message}; writeObj.flags = options.flags; call.write(writeObj); call.end(); this.handleUnaryResponse(call, deserialize, callback); - return emitter; + return new ClientUnaryCallImpl(call); } makeClientStreamRequest( @@ -174,7 +172,6 @@ export class Client { method: string, serialize: (value: RequestType) => Buffer, deserialize: (value: Buffer) => ResponseType, callback: UnaryCallback): ClientWritableStream; - makeClientStreamRequest( method: string, serialize: (value: RequestType) => Buffer, deserialize: (value: Buffer) => ResponseType, @@ -187,10 +184,8 @@ export class Client { metadata, options, callback)); const call: CallStream = this.channel.createStream(method, metadata, options); - const stream: ClientWritableStream = - new ClientWritableStreamImpl(call, serialize); this.handleUnaryResponse(call, deserialize, callback); - return stream; + return new ClientWritableStreamImpl(call, serialize); } private checkMetadataAndOptions( @@ -233,14 +228,12 @@ export class Client { ({metadata, options} = this.checkMetadataAndOptions(metadata, options)); const call: CallStream = this.channel.createStream(method, metadata, options); - const stream: ClientReadableStream = - new ClientReadableStreamImpl(call, deserialize); const message: Buffer = serialize(argument); const writeObj: WriteObject = {message: message}; writeObj.flags = options.flags; call.write(writeObj); call.end(); - return stream; + return new ClientReadableStreamImpl(call, deserialize); } makeBidiStreamRequest( @@ -259,9 +252,7 @@ export class Client { ({metadata, options} = this.checkMetadataAndOptions(metadata, options)); const call: CallStream = this.channel.createStream(method, metadata, options); - const stream: ClientDuplexStream = - new ClientDuplexStreamImpl( - call, serialize, deserialize); - return stream; + return new ClientDuplexStreamImpl( + call, serialize, deserialize); } } diff --git a/packages/grpc-js-core/src/events.ts b/packages/grpc-js-core/src/events.ts new file mode 100644 index 00000000..591120d1 --- /dev/null +++ b/packages/grpc-js-core/src/events.ts @@ -0,0 +1,29 @@ +export interface EmitterAugmentation0 { + addListener(event: Name, listener: () => void): this; + emit(event: Name): boolean; + on(event: Name, listener: () => void): this; + once(event: Name, listener: () => void): this; + prependListener(event: Name, listener: () => void): this; + prependOnceListener(event: Name, listener: () => void): this; + removeListener(event: Name, listener: () => void): this; +} + +export interface EmitterAugmentation1 { + addListener(event: Name, listener: (arg1: Arg) => void): this; + emit(event: Name, arg1: Arg): boolean; + on(event: Name, listener: (arg1: Arg) => void): this; + once(event: Name, listener: (arg1: Arg) => void): this; + prependListener(event: Name, listener: (arg1: Arg) => void): this; + prependOnceListener(event: Name, listener: (arg1: Arg) => void): this; + removeListener(event: Name, listener: (arg1: Arg) => void): this; +} + +export interface EmitterAugmentation2 { + addListener(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this; + emit(event: Name, arg1: Arg1, arg2: Arg2): boolean; + on(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this; + once(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this; + prependListener(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this; + prependOnceListener(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this; + removeListener(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this; +} diff --git a/packages/grpc-js-core/src/object-stream.ts b/packages/grpc-js-core/src/object-stream.ts index bed77f04..48988daa 100644 --- a/packages/grpc-js-core/src/object-stream.ts +++ b/packages/grpc-js-core/src/object-stream.ts @@ -1,28 +1,14 @@ import {Duplex, Readable, Writable} from 'stream'; +import {EmitterAugmentation1} from './events'; export interface IntermediateObjectReadable extends Readable { read(size?: number): any&T; } -export interface ObjectReadable extends IntermediateObjectReadable { +export type ObjectReadable = { read(size?: number): T; - - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; - - addListener(event: 'data', listener: (chunk: T) => void): this; - emit(event: 'data', chunk: T): boolean; - on(event: 'data', listener: (chunk: T) => void): this; - once(event: 'data', listener: (chunk: T) => void): this; - prependListener(event: 'data', listener: (chunk: T) => void): this; - prependOnceListener(event: 'data', listener: (chunk: T) => void): this; - removeListener(event: 'data', listener: (chunk: T) => void): this; -} +} & EmitterAugmentation1<'data', T> + & IntermediateObjectReadable; export interface IntermediateObjectWritable extends Writable { _write(chunk: any&T, encoding: string, callback: Function): void; @@ -44,8 +30,7 @@ export interface ObjectWritable extends IntermediateObjectWritable { end(chunk: T, encoding?: any, cb?: Function): void; } -export interface ObjectDuplex extends Duplex, ObjectWritable, - ObjectReadable { +export type ObjectDuplex = { read(size?: number): U; _write(chunk: T, encoding: string, callback: Function): void; @@ -54,13 +39,4 @@ export interface ObjectDuplex extends Duplex, ObjectWritable, end(): void; end(chunk: T, cb?: Function): void; end(chunk: T, encoding?: any, cb?: Function): void; - - - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; -} +} & Duplex & ObjectWritable & ObjectReadable;