From cc231f1f67a4703d428a5b83fafa36bb035e130f Mon Sep 17 00:00:00 2001 From: Kelvin Jin Date: Mon, 18 Dec 2017 15:24:11 -0800 Subject: [PATCH 1/2] grpc-js-core: merge all eventemitter overloads --- packages/grpc-js-core/src/call-stream.ts | 37 ++-------- packages/grpc-js-core/src/call.ts | 83 ++++------------------ packages/grpc-js-core/src/client.ts | 14 ++-- packages/grpc-js-core/src/events.ts | 29 ++++++++ packages/grpc-js-core/src/object-stream.ts | 36 ++-------- 5 files changed, 63 insertions(+), 136 deletions(-) create mode 100644 packages/grpc-js-core/src/events.ts diff --git a/packages/grpc-js-core/src/call-stream.ts b/packages/grpc-js-core/src/call-stream.ts index 3c3b5cfa..4b4980c5 100644 --- a/packages/grpc-js-core/src/call-stream.ts +++ b/packages/grpc-js-core/src/call-stream.ts @@ -3,6 +3,7 @@ import {Duplex} from 'stream'; import {CallCredentials} from './call-credentials'; import {Status} from './constants'; +import {EmitterAugmentation1} from './events'; import {Filter} from './filter'; import {FilterStackFactory} from './filter-stack'; import {Metadata} from './metadata'; @@ -34,7 +35,7 @@ export interface WriteObject { /** * This interface represents a duplex stream associated with a single gRPC call. */ -export interface CallStream extends ObjectDuplex { +export type CallStream = { cancelWithStatus(status: Status, details: string): void; getPeer(): string; @@ -43,37 +44,9 @@ export interface CallStream extends ObjectDuplex { /* If the return value is null, the call has not ended yet. Otherwise, it has * ended with the specified status */ getStatus(): StatusObject|null; - - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; - - addListener(event: 'metadata', listener: (metadata: Metadata) => void): this; - emit(event: 'metadata', metadata: Metadata): boolean; - on(event: 'metadata', listener: (metadata: Metadata) => void): this; - once(event: 'metadata', listener: (metadata: Metadata) => void): this; - prependListener(event: 'metadata', listener: (metadata: Metadata) => void): - this; - prependOnceListener( - event: 'metadata', listener: (metadata: Metadata) => void): this; - removeListener(event: 'metadata', listener: (metadata: Metadata) => void): - this; - - addListener(event: 'status', listener: (status: StatusObject) => void): this; - emit(event: 'status', status: StatusObject): boolean; - on(event: 'status', listener: (status: StatusObject) => void): this; - once(event: 'status', listener: (status: StatusObject) => void): this; - prependListener(event: 'status', listener: (status: StatusObject) => void): - this; - prependOnceListener( - event: 'status', listener: (status: StatusObject) => void): this; - removeListener(event: 'status', listener: (status: StatusObject) => void): - this; -} +} & EmitterAugmentation1<'metadata', Metadata> + & EmitterAugmentation1<'status', StatusObject> + & ObjectDuplex; enum ReadState { NO_DATA, diff --git a/packages/grpc-js-core/src/call.ts b/packages/grpc-js-core/src/call.ts index a4ae4479..5abc0479 100644 --- a/packages/grpc-js-core/src/call.ts +++ b/packages/grpc-js-core/src/call.ts @@ -1,4 +1,5 @@ import {EventEmitter} from 'events'; +import {EmitterAugmentation1} from './events'; import {Duplex, Readable, Writable} from 'stream'; import {CallStream, StatusObject, WriteObject} from './call-stream'; @@ -16,44 +17,27 @@ export class ServiceErrorImpl extends Error implements ServiceError { metadata?: Metadata; } -export interface Call extends EventEmitter { +export type Call = { cancel(): void; getPeer(): string; +} & EmitterAugmentation1<'metadata', Metadata> + & EmitterAugmentation1<'status', StatusObject> + & EventEmitter; - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; +export type ClientUnaryCall = Call; - addListener(event: 'metadata', listener: (metadata: Metadata) => void): this; - emit(event: 'metadata', metadata: Metadata): boolean; - on(event: 'metadata', listener: (metadata: Metadata) => void): this; - once(event: 'metadata', listener: (metadata: Metadata) => void): this; - prependListener(event: 'metadata', listener: (metadata: Metadata) => void): - this; - prependOnceListener( - event: 'metadata', listener: (metadata: Metadata) => void): this; - removeListener(event: 'metadata', listener: (metadata: Metadata) => void): - this; +export type ClientReadableStream = { + deserialize: (chunk: Buffer) => ResponseType; +} & Call & ObjectReadable; - addListener(event: 'status', listener: (status: StatusObject) => void): this; - emit(event: 'status', status: StatusObject): boolean; - on(event: 'status', listener: (status: StatusObject) => void): this; - once(event: 'status', listener: (status: StatusObject) => void): this; - prependListener(event: 'status', listener: (status: StatusObject) => void): - this; - prependOnceListener( - event: 'status', listener: (status: StatusObject) => void): this; - removeListener(event: 'status', listener: (status: StatusObject) => void): - this; -} +export type ClientWritableStream = { + serialize: (value: RequestType) => Buffer; +} & Call & ObjectWritable; -export interface ClientUnaryCall extends Call {} +export type ClientDuplexStream = + ClientWritableStream & ClientReadableStream; -export class ClientUnaryCallImpl extends EventEmitter implements Call { +export class ClientUnaryCallImpl extends EventEmitter implements ClientUnaryCall { constructor(private readonly call: CallStream) { super(); call.on('metadata', (metadata: Metadata) => { @@ -73,43 +57,6 @@ export class ClientUnaryCallImpl extends EventEmitter implements Call { } } -export interface ClientReadableStream extends - Call, ObjectReadable { - deserialize: (chunk: Buffer) => ResponseType; - - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; -} - -export interface ClientWritableStream extends - Call, ObjectWritable { - serialize: (value: RequestType) => Buffer; - - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; -} - -export interface ClientDuplexStream extends - ClientWritableStream, ClientReadableStream { - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; -} - function setUpReadableStream( stream: ClientReadableStream, call: CallStream, deserialize: (chunk: Buffer) => ResponseType): void { diff --git a/packages/grpc-js-core/src/client.ts b/packages/grpc-js-core/src/client.ts index 13849ab7..ade3780b 100644 --- a/packages/grpc-js-core/src/client.ts +++ b/packages/grpc-js-core/src/client.ts @@ -135,7 +135,6 @@ export class Client { method: string, serialize: (value: RequestType) => Buffer, deserialize: (value: Buffer) => ResponseType, argument: RequestType, callback: UnaryCallback): ClientUnaryCall; - makeUnaryRequest( method: string, serialize: (value: RequestType) => Buffer, deserialize: (value: Buffer) => ResponseType, argument: RequestType, @@ -147,13 +146,14 @@ export class Client { metadata, options, callback)); const call: CallStream = this.channel.createStream(method, metadata, options); - const emitter: ClientUnaryCall = new ClientUnaryCallImpl(call); const message: Buffer = serialize(argument); const writeObj: WriteObject = {message: message}; writeObj.flags = options.flags; call.write(writeObj); call.end(); this.handleUnaryResponse(call, deserialize, callback); + + const emitter: ClientUnaryCall = new ClientUnaryCallImpl(call); return emitter; } @@ -174,7 +174,6 @@ export class Client { method: string, serialize: (value: RequestType) => Buffer, deserialize: (value: Buffer) => ResponseType, callback: UnaryCallback): ClientWritableStream; - makeClientStreamRequest( method: string, serialize: (value: RequestType) => Buffer, deserialize: (value: Buffer) => ResponseType, @@ -187,9 +186,10 @@ export class Client { metadata, options, callback)); const call: CallStream = this.channel.createStream(method, metadata, options); + this.handleUnaryResponse(call, deserialize, callback); + const stream: ClientWritableStream = new ClientWritableStreamImpl(call, serialize); - this.handleUnaryResponse(call, deserialize, callback); return stream; } @@ -233,13 +233,14 @@ export class Client { ({metadata, options} = this.checkMetadataAndOptions(metadata, options)); const call: CallStream = this.channel.createStream(method, metadata, options); - const stream: ClientReadableStream = - new ClientReadableStreamImpl(call, deserialize); const message: Buffer = serialize(argument); const writeObj: WriteObject = {message: message}; writeObj.flags = options.flags; call.write(writeObj); call.end(); + + const stream: ClientReadableStream = + new ClientReadableStreamImpl(call, deserialize); return stream; } @@ -259,6 +260,7 @@ export class Client { ({metadata, options} = this.checkMetadataAndOptions(metadata, options)); const call: CallStream = this.channel.createStream(method, metadata, options); + const stream: ClientDuplexStream = new ClientDuplexStreamImpl( call, serialize, deserialize); diff --git a/packages/grpc-js-core/src/events.ts b/packages/grpc-js-core/src/events.ts new file mode 100644 index 00000000..591120d1 --- /dev/null +++ b/packages/grpc-js-core/src/events.ts @@ -0,0 +1,29 @@ +export interface EmitterAugmentation0 { + addListener(event: Name, listener: () => void): this; + emit(event: Name): boolean; + on(event: Name, listener: () => void): this; + once(event: Name, listener: () => void): this; + prependListener(event: Name, listener: () => void): this; + prependOnceListener(event: Name, listener: () => void): this; + removeListener(event: Name, listener: () => void): this; +} + +export interface EmitterAugmentation1 { + addListener(event: Name, listener: (arg1: Arg) => void): this; + emit(event: Name, arg1: Arg): boolean; + on(event: Name, listener: (arg1: Arg) => void): this; + once(event: Name, listener: (arg1: Arg) => void): this; + prependListener(event: Name, listener: (arg1: Arg) => void): this; + prependOnceListener(event: Name, listener: (arg1: Arg) => void): this; + removeListener(event: Name, listener: (arg1: Arg) => void): this; +} + +export interface EmitterAugmentation2 { + addListener(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this; + emit(event: Name, arg1: Arg1, arg2: Arg2): boolean; + on(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this; + once(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this; + prependListener(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this; + prependOnceListener(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this; + removeListener(event: Name, listener: (arg1: Arg1, arg2: Arg2) => void): this; +} diff --git a/packages/grpc-js-core/src/object-stream.ts b/packages/grpc-js-core/src/object-stream.ts index bed77f04..48988daa 100644 --- a/packages/grpc-js-core/src/object-stream.ts +++ b/packages/grpc-js-core/src/object-stream.ts @@ -1,28 +1,14 @@ import {Duplex, Readable, Writable} from 'stream'; +import {EmitterAugmentation1} from './events'; export interface IntermediateObjectReadable extends Readable { read(size?: number): any&T; } -export interface ObjectReadable extends IntermediateObjectReadable { +export type ObjectReadable = { read(size?: number): T; - - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; - - addListener(event: 'data', listener: (chunk: T) => void): this; - emit(event: 'data', chunk: T): boolean; - on(event: 'data', listener: (chunk: T) => void): this; - once(event: 'data', listener: (chunk: T) => void): this; - prependListener(event: 'data', listener: (chunk: T) => void): this; - prependOnceListener(event: 'data', listener: (chunk: T) => void): this; - removeListener(event: 'data', listener: (chunk: T) => void): this; -} +} & EmitterAugmentation1<'data', T> + & IntermediateObjectReadable; export interface IntermediateObjectWritable extends Writable { _write(chunk: any&T, encoding: string, callback: Function): void; @@ -44,8 +30,7 @@ export interface ObjectWritable extends IntermediateObjectWritable { end(chunk: T, encoding?: any, cb?: Function): void; } -export interface ObjectDuplex extends Duplex, ObjectWritable, - ObjectReadable { +export type ObjectDuplex = { read(size?: number): U; _write(chunk: T, encoding: string, callback: Function): void; @@ -54,13 +39,4 @@ export interface ObjectDuplex extends Duplex, ObjectWritable, end(): void; end(chunk: T, cb?: Function): void; end(chunk: T, encoding?: any, cb?: Function): void; - - - addListener(event: string, listener: Function): this; - emit(event: string|symbol, ...args: any[]): boolean; - on(event: string, listener: Function): this; - once(event: string, listener: Function): this; - prependListener(event: string, listener: Function): this; - prependOnceListener(event: string, listener: Function): this; - removeListener(event: string, listener: Function): this; -} +} & Duplex & ObjectWritable & ObjectReadable; From 7f0b5b96f73af4d1fada52f97c38ed8d988e5f9a Mon Sep 17 00:00:00 2001 From: Kelvin Jin Date: Mon, 18 Dec 2017 15:24:11 -0800 Subject: [PATCH 2/2] grpc-js-core: don't use local emitter vars --- packages/grpc-js-core/src/client.ts | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/packages/grpc-js-core/src/client.ts b/packages/grpc-js-core/src/client.ts index ade3780b..862d212c 100644 --- a/packages/grpc-js-core/src/client.ts +++ b/packages/grpc-js-core/src/client.ts @@ -152,9 +152,7 @@ export class Client { call.write(writeObj); call.end(); this.handleUnaryResponse(call, deserialize, callback); - - const emitter: ClientUnaryCall = new ClientUnaryCallImpl(call); - return emitter; + return new ClientUnaryCallImpl(call); } makeClientStreamRequest( @@ -187,10 +185,7 @@ export class Client { const call: CallStream = this.channel.createStream(method, metadata, options); this.handleUnaryResponse(call, deserialize, callback); - - const stream: ClientWritableStream = - new ClientWritableStreamImpl(call, serialize); - return stream; + return new ClientWritableStreamImpl(call, serialize); } private checkMetadataAndOptions( @@ -238,10 +233,7 @@ export class Client { writeObj.flags = options.flags; call.write(writeObj); call.end(); - - const stream: ClientReadableStream = - new ClientReadableStreamImpl(call, deserialize); - return stream; + return new ClientReadableStreamImpl(call, deserialize); } makeBidiStreamRequest( @@ -260,10 +252,7 @@ export class Client { ({metadata, options} = this.checkMetadataAndOptions(metadata, options)); const call: CallStream = this.channel.createStream(method, metadata, options); - - const stream: ClientDuplexStream = - new ClientDuplexStreamImpl( - call, serialize, deserialize); - return stream; + return new ClientDuplexStreamImpl( + call, serialize, deserialize); } }