diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 57c3d20d..68a754b7 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -16,62 +16,74 @@ */ import {EventEmitter} from 'events'; +import * as http2 from 'http2'; import {Duplex, Readable, Writable} from 'stream'; + import {ServiceError} from './call'; +import {Status} from './constants'; import {Deserialize, Serialize} from './make-client'; import {Metadata} from './metadata'; +function noop(): void {} -export class ServerUnaryCall extends EventEmitter { - cancelled: boolean; - request: RequestType|null; +export type PartialServiceError = Partial; - constructor(private call: ServerCall, public metadata: Metadata) { - super(); - this.cancelled = false; - this.request = null; // TODO(cjihrig): Read the unary request here. - } +type DeadlineUnitIndexSignature = { + [name: string]: number +}; - getPeer(): string { - throw new Error('not implemented yet'); - } - - sendMetadata(responseMetadata: Metadata): void { - throw new Error('not implemented yet'); - } -} +const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding'; +const GRPC_ENCODING_HEADER = 'grpc-encoding'; +const GRPC_MESSAGE_HEADER = 'grpc-message'; +const GRPC_STATUS_HEADER = 'grpc-status'; +const GRPC_TIMEOUT_HEADER = 'grpc-timeout'; +const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/; +const deadlineUnitsToMs: DeadlineUnitIndexSignature = { + H: 3600000, + M: 60000, + S: 1000, + m: 1, + u: 0.001, + n: 0.000001 +}; +const defaultResponseHeaders = { + // TODO(cjihrig): Remove these encoding headers from the default response + // once compression is integrated. + [GRPC_ACCEPT_ENCODING_HEADER]: 'identity', + [GRPC_ENCODING_HEADER]: 'identity', + [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK, + [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto' +}; +const defaultResponseOptions = { + waitForTrailers: true +} as http2.ServerStreamResponseOptions; -export class ServerReadableStream extends Readable { - cancelled: boolean; +export type ServerSurfaceCall = { + cancelled: boolean; getPeer(): string; + sendMetadata(responseMetadata: Metadata): void +}; - constructor( - private call: ServerCall, public metadata: Metadata, - private deserialize: Deserialize) { - super(); - this.cancelled = false; - } +export type ServerUnaryCall = + ServerSurfaceCall&{request: RequestType | null}; +export type ServerReadableStream = + ServerSurfaceCall&Readable; +export type ServerWritableStream = + ServerSurfaceCall&Writable&{request: RequestType | null}; +export type ServerDuplexStream = + ServerSurfaceCall&Duplex; - getPeer(): string { - throw new Error('not implemented yet'); - } - - sendMetadata(responseMetadata: Metadata): void { - throw new Error('not implemented yet'); - } -} - - -export class ServerWritableStream extends Writable { +export class ServerUnaryCallImpl extends EventEmitter + implements ServerUnaryCall { cancelled: boolean; request: RequestType|null; constructor( - private call: ServerCall, public metadata: Metadata, - private serialize: Serialize) { + private call: Http2ServerCallStream, + public metadata: Metadata) { super(); this.cancelled = false; - this.request = null; // TODO(cjihrig): Read the unary request here. + this.request = null; } getPeer(): string { @@ -79,18 +91,109 @@ export class ServerWritableStream extends Writable { } sendMetadata(responseMetadata: Metadata): void { - throw new Error('not implemented yet'); + this.call.sendMetadata(responseMetadata); } } -export class ServerDuplexStream extends Duplex { +export class ServerReadableStreamImpl extends + Readable implements ServerReadableStream { + cancelled: boolean; + private done = false; + + constructor( + private call: Http2ServerCallStream, + public metadata: Metadata, + private _deserialize: Deserialize) { + super(); + this.cancelled = false; + } + + getPeer(): string { + throw new Error('not implemented yet'); + } + + sendMetadata(responseMetadata: Metadata): void { + this.call.sendMetadata(responseMetadata); + } + + _done(): void { + this.done = true; + this.on('data', noop); + } +} + + +export class ServerWritableStreamImpl extends + Writable implements ServerWritableStream { + cancelled: boolean; + request: RequestType|null; + + constructor( + private call: Http2ServerCallStream, + public metadata: Metadata, private _serialize: Serialize) { + super({objectMode: true}); + this.cancelled = false; + this.request = null; + + this.on('error', (err) => { + this.call.sendError(err as ServiceError); + this.end(); + }); + } + + getPeer(): string { + throw new Error('not implemented yet'); + } + + sendMetadata(responseMetadata: Metadata): void { + this.call.sendMetadata(responseMetadata); + } + + async _write(chunk: ResponseType, encoding: string, callback: Function) { + try { + const response = await this.call.serializeMessage(chunk); + this.call.write(response); + } catch (err) { + err.code = Status.INTERNAL; + this.emit('error', err); + } + + callback(null); + } + + _final(callback: Function): void { + this.call.end(); + callback(null); + } + + // tslint:disable-next-line:no-any + end(metadata?: any) { + if (metadata) { + this.call.status.metadata = metadata; + } + + super.end(); + } + + serialize(input: ResponseType): Buffer|null { + if (input === null || input === undefined) { + return null; + } + + return this._serialize(input); + } +} + + +export class ServerDuplexStreamImpl extends Duplex + implements ServerDuplexStream { cancelled: boolean; constructor( - private call: ServerCall, public metadata: Metadata, - private serialize: Serialize, - private deserialize: Deserialize) { + private call: Http2ServerCallStream, + public metadata: Metadata, private _serialize: Serialize, + private _deserialize: Deserialize) { super(); this.cancelled = false; } @@ -100,15 +203,11 @@ export class ServerDuplexStream extends Duplex { } sendMetadata(responseMetadata: Metadata): void { - throw new Error('not implemented yet'); + this.call.sendMetadata(responseMetadata); } } -// Internal class that wraps the HTTP2 request. -export class ServerCall {} - - // Unary response callback signature. export type sendUnaryData = (error: ServiceError|null, value: ResponseType|null, trailer?: Metadata, @@ -116,12 +215,12 @@ export type sendUnaryData = // User provided handler for unary calls. export type handleUnaryCall = - (call: ServerUnaryCall, + (call: ServerUnaryCall, callback: sendUnaryData) => void; // User provided handler for client streaming calls. export type handleClientStreamingCall = - (call: ServerReadableStream, + (call: ServerReadableStream, callback: sendUnaryData) => void; // User provided handler for server streaming calls. @@ -133,9 +232,9 @@ export type handleBidiStreamingCall = (call: ServerDuplexStream) => void; export type HandleCall = - handleUnaryCall| - handleClientStreamingCall| - handleServerStreamingCall| + handleUnaryCall& + handleClientStreamingCall& + handleServerStreamingCall& handleBidiStreamingCall; export type Handler = { @@ -146,3 +245,213 @@ export type Handler = { }; export type HandlerType = 'bidi'|'clientStream'|'serverStream'|'unary'; + + +// Internal class that wraps the HTTP2 request. +export class Http2ServerCallStream extends + EventEmitter { + cancelled = false; + deadline: NodeJS.Timer|null = null; + status: PartialServiceError = {code: Status.OK, details: 'OK'}; + + constructor( + private stream: http2.ServerHttp2Stream, + private handler: Handler|null) { + super(); + + this.stream.once('error', (err: Error) => { + this.sendError(err as ServiceError, Status.INTERNAL); + }); + + this.stream.once('close', () => { + if (this.stream.rstCode === http2.constants.NGHTTP2_CANCEL) { + this.cancelled = true; + this.emit('cancelled', 'cancelled'); + } + }); + } + + private get _metadataSent(): boolean { + return this.stream.headersSent; + } + + sendMetadata(customMetadata?: Metadata) { + if (this._metadataSent) { + return; + } + + const custom = customMetadata ? customMetadata.toHttp2Headers() : null; + // TODO(cjihrig): Include compression headers. + const headers = Object.assign(defaultResponseHeaders, custom); + + this.stream.respond(headers, defaultResponseOptions); + this.stream.once('wantTrailers', () => { + let trailersToSend = { + [GRPC_STATUS_HEADER]: this.status.code, + [GRPC_MESSAGE_HEADER]: encodeURI(this.status.details as string) + }; + const metadata = this.status.metadata; + + if (metadata) { + trailersToSend = + Object.assign(trailersToSend, metadata.toHttp2Headers()); + } + + this.stream.sendTrailers(trailersToSend); + }); + } + + receiveMetadata(headers: http2.IncomingHttpHeaders) { + const metadata = Metadata.fromHttp2Headers(headers); + + // TODO(cjihrig): Receive compression metadata. + + const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER); + + if (timeoutHeader.length > 0) { + const match = timeoutHeader[0].toString().match(DEADLINE_REGEX); + + if (match === null) { + this.sendError( + new Error('Invalid deadline') as ServiceError, Status.OUT_OF_RANGE); + return; + } + + const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0; + + this.deadline = setTimeout(handleExpiredDeadline, timeout, this); + metadata.remove(GRPC_TIMEOUT_HEADER); + } + + return metadata; + } + + receiveUnaryMessage(): Promise { + return new Promise((resolve, reject) => { + const stream = this.stream; + const chunks: Buffer[] = []; + let totalLength = 0; + + stream.on('data', (data: Buffer) => { + chunks.push(data); + totalLength += data.byteLength; + }); + + stream.once('end', async () => { + try { + const requestBytes = Buffer.concat(chunks, totalLength); + + resolve(await this.deserializeMessage(requestBytes)); + } catch (err) { + this.sendError(err, Status.INTERNAL); + resolve(); + } + }); + }); + } + + serializeMessage(value: ResponseType) { + const handler = this.handler as Handler; + const messageBuffer = handler.serialize(value); + + // TODO(cjihrig): Call compression aware serializeMessage(). + const byteLength = messageBuffer.byteLength; + const output = Buffer.allocUnsafe(byteLength + 5); + output.writeUInt8(0, 0); + output.writeUInt32BE(byteLength, 1); + messageBuffer.copy(output, 5); + return output; + } + + async deserializeMessage(bytes: Buffer) { + const handler = this.handler as Handler; + // TODO(cjihrig): Call compression aware deserializeMessage(). + const receivedMessage = bytes.slice(5); + + return handler.deserialize(receivedMessage); + } + + async sendUnaryMessage( + err: ServiceError|null, value: ResponseType|null, metadata?: Metadata, + flags?: number) { + if (err) { + if (metadata) { + err.metadata = metadata; + } + + this.sendError(err); + return; + } + + try { + const response = await this.serializeMessage(value as ResponseType); + + if (metadata) { + this.status.metadata = metadata; + } + + this.end(response); + } catch (err) { + this.sendError(err, Status.INTERNAL); + } + } + + sendError(error: ServiceError, code = Status.UNKNOWN) { + const {status} = this; + + if (error.hasOwnProperty('message')) { + status.details = error.message; + } else { + status.details = 'Unknown Error'; + } + + if (error.hasOwnProperty('code') && Number.isInteger(error.code)) { + status.code = error.code; + + if (error.hasOwnProperty('details')) { + status.details = error.details; + } + } else { + status.code = code; + } + + if (error.hasOwnProperty('metadata')) { + status.metadata = error.metadata; + } + + this.end(); + } + + write(chunk: Buffer) { + if (this.cancelled === true) { + return; + } + + this.sendMetadata(); + return this.stream.write(chunk); + } + + end(payload?: Buffer) { + if (this.cancelled === true) { + return; + } + + if (this.deadline !== null) { + clearTimeout(this.deadline); + this.deadline = null; + } + + this.sendMetadata(); + return this.stream.end(payload); + } +} + +// tslint:disable:no-any +type UntypedServerCall = Http2ServerCallStream; + +function handleExpiredDeadline(call: UntypedServerCall) { + call.sendError( + new Error('Deadline exceeded') as ServiceError, Status.DEADLINE_EXCEEDED); + call.cancelled = true; + call.emit('cancelled', 'deadline'); +} diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index decdbe5a..567c05e6 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -22,12 +22,12 @@ import {URL} from 'url'; import {ServiceError} from './call'; import {Status} from './constants'; import {Deserialize, Serialize, ServiceDefinition} from './make-client'; -import {HandleCall, Handler, HandlerType, sendUnaryData, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServerWritableStream} from './server-call'; +import {Metadata} from './metadata'; +import {HandleCall, Handler, HandlerType, Http2ServerCallStream, PartialServiceError, sendUnaryData, ServerDuplexStream, ServerDuplexStreamImpl, ServerReadableStream, ServerReadableStreamImpl, ServerUnaryCall, ServerUnaryCallImpl, ServerWritableStream, ServerWritableStreamImpl} from './server-call'; import {ServerCredentials} from './server-credentials'; function noop(): void {} -type PartialServiceError = Partial; const unimplementedStatusResponse: PartialServiceError = { code: Status.UNIMPLEMENTED, details: 'The server does not implement this method', @@ -41,10 +41,11 @@ type UntypedServiceImplementation = { }; const defaultHandler = { - unary(call: ServerUnaryCall, callback: sendUnaryData): void { + unary(call: ServerUnaryCall, callback: sendUnaryData): void { callback(unimplementedStatusResponse as ServiceError, null); }, - clientStream(call: ServerReadableStream, callback: sendUnaryData): + clientStream( + call: ServerReadableStream, callback: sendUnaryData): void { callback(unimplementedStatusResponse as ServiceError, null); }, @@ -120,8 +121,8 @@ export class Server { } const success = this.register( - attrs.path, impl, attrs.responseSerialize, attrs.requestDeserialize, - methodType); + attrs.path, impl as UntypedHandleCall, attrs.responseSerialize, + attrs.requestDeserialize, methodType); if (success === false) { throw new Error(`Method handler for ${attrs.path} already provided.`); @@ -162,7 +163,7 @@ export class Server { this.http2Server = http2.createServer(); } - // TODO(cjihrig): Set up the handlers, to allow requests to be processed. + this._setupHandlers(); function onError(err: Error): void { callback(err, -1); @@ -227,4 +228,107 @@ export class Server { addHttp2Port(): void { throw new Error('Not yet implemented'); } + + private _setupHandlers(): void { + if (this.http2Server === null) { + return; + } + + this.http2Server.on( + 'stream', + (stream: http2.ServerHttp2Stream, + headers: http2.IncomingHttpHeaders) => { + if (this.started !== true) { + stream.end(); + return; + } + + try { + const path = headers[http2.constants.HTTP2_HEADER_PATH] as string; + const handler = this.handlers.get(path); + + if (handler === undefined) { + throw unimplementedStatusResponse; + } + + const call = new Http2ServerCallStream(stream, handler); + const metadata: Metadata = + call.receiveMetadata(headers) as Metadata; + + switch (handler.type) { + case 'unary': + handleUnary(call, handler, metadata); + break; + case 'clientStream': + handleClientStreaming(call, handler, metadata); + break; + case 'serverStream': + handleServerStreaming(call, handler, metadata); + break; + case 'bidi': + handleBidiStreaming(call, handler, metadata); + break; + default: + throw new Error(`Unknown handler type: ${handler.type}`); + } + } catch (err) { + const call = new Http2ServerCallStream(stream, null); + call.sendError(err, Status.INTERNAL); + } + }); + } +} + + +async function handleUnary( + call: Http2ServerCallStream, + handler: Handler, + metadata: Metadata): Promise { + const emitter = + new ServerUnaryCallImpl(call, metadata); + const request = await call.receiveUnaryMessage(); + + if (request === undefined || call.cancelled === true) { + return; + } + + emitter.request = request; + handler.func( + emitter, + (err: ServiceError|null, value: ResponseType|null, trailer?: Metadata, + flags?: number) => { + call.sendUnaryMessage(err, value, trailer, flags); + }); +} + + +function handleClientStreaming( + call: Http2ServerCallStream, + handler: Handler, metadata: Metadata): void { + throw new Error('not implemented yet'); +} + + +async function handleServerStreaming( + call: Http2ServerCallStream, + handler: Handler, + metadata: Metadata): Promise { + const request = await call.receiveUnaryMessage(); + + if (request === undefined || call.cancelled === true) { + return; + } + + const stream = new ServerWritableStreamImpl( + call, metadata, handler.serialize); + + stream.request = request; + handler.func(stream); +} + + +function handleBidiStreaming( + call: Http2ServerCallStream, + handler: Handler, metadata: Metadata): void { + throw new Error('not implemented yet'); } diff --git a/packages/grpc-js/test/fixtures/echo_service.proto b/packages/grpc-js/test/fixtures/echo_service.proto new file mode 100644 index 00000000..20b3bfe0 --- /dev/null +++ b/packages/grpc-js/test/fixtures/echo_service.proto @@ -0,0 +1,33 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +syntax = "proto3"; + +message EchoMessage { + string value = 1; + int32 value2 = 2; +} + +service EchoService { + rpc Echo (EchoMessage) returns (EchoMessage); + + rpc EchoClientStream (stream EchoMessage) returns (EchoMessage); + + rpc EchoServerStream (EchoMessage) returns (stream EchoMessage); + + rpc EchoBidiStream (stream EchoMessage) returns (stream EchoMessage); +} diff --git a/packages/grpc-js/test/fixtures/test_service.proto b/packages/grpc-js/test/fixtures/test_service.proto new file mode 100644 index 00000000..db876be9 --- /dev/null +++ b/packages/grpc-js/test/fixtures/test_service.proto @@ -0,0 +1,41 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +syntax = "proto3"; + +message Request { + bool error = 1; + string message = 2; +} + +message Response { + int32 count = 1; +} + +service TestService { + rpc Unary (Request) returns (Response) { + } + + rpc ClientStream (stream Request) returns (Response) { + } + + rpc ServerStream (Request) returns (stream Response) { + } + + rpc BidiStream (stream Request) returns (stream Response) { + } +} diff --git a/packages/grpc-js/test/test-server-errors.ts b/packages/grpc-js/test/test-server-errors.ts new file mode 100644 index 00000000..eabae3aa --- /dev/null +++ b/packages/grpc-js/test/test-server-errors.ts @@ -0,0 +1,522 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Allow `any` data type for testing runtime type checking. +// tslint:disable no-any +import * as assert from 'assert'; +import {join} from 'path'; + +import * as grpc from '../src'; +import {ServiceError} from '../src/call'; +import {ServiceClient, ServiceClientConstructor} from '../src/make-client'; +import {Server} from '../src/server'; +import {sendUnaryData, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServerWritableStream} from '../src/server-call'; + +import {loadProtoFile} from './common'; + +const protoFile = join(__dirname, 'fixtures', 'test_service.proto'); +const testServiceDef = loadProtoFile(protoFile); +const testServiceClient = + testServiceDef.TestService as ServiceClientConstructor; +const clientInsecureCreds = grpc.credentials.createInsecure(); +const serverInsecureCreds = grpc.ServerCredentials.createInsecure(); + + +describe('Client malformed response handling', () => { + let server: Server; + let client: ServiceClient; + const badArg = Buffer.from([0xFF]); + + before((done) => { + const malformedTestService = { + unary: { + path: '/TestService/Unary', + requestStream: false, + responseStream: false, + requestDeserialize: identity, + responseSerialize: identity + }, + clientStream: { + path: '/TestService/ClientStream', + requestStream: true, + responseStream: false, + requestDeserialize: identity, + responseSerialize: identity + }, + serverStream: { + path: '/TestService/ServerStream', + requestStream: false, + responseStream: true, + requestDeserialize: identity, + responseSerialize: identity + }, + bidiStream: { + path: '/TestService/BidiStream', + requestStream: true, + responseStream: true, + requestDeserialize: identity, + responseSerialize: identity + } + } as any; + + server = new Server(); + + server.addService(malformedTestService, { + unary(call: ServerUnaryCall, cb: sendUnaryData) { + cb(null, badArg); + }, + + clientStream( + stream: ServerReadableStream, cb: sendUnaryData) { + stream.on('data', noop); + stream.on('end', () => { + cb(null, badArg); + }); + }, + + serverStream(stream: ServerWritableStream) { + stream.write(badArg); + stream.end(); + }, + + bidiStream(stream: ServerDuplexStream) { + stream.on('data', () => { + // Ignore requests + stream.write(badArg); + }); + + stream.on('end', () => { + stream.end(); + }); + } + }); + + server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => { + assert.ifError(err); + client = new testServiceClient(`localhost:${port}`, clientInsecureCreds); + server.start(); + done(); + }); + }); + + after((done) => { + client.close(); + server.tryShutdown(done); + }); + + it('should get an INTERNAL status with a unary call', (done) => { + client.unary({}, (err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + }); + + it('should get an INTERNAL status with a server stream call', (done) => { + const call = client.serverStream({}); + + call.on('data', noop); + call.on('error', (err: ServiceError) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + }); +}); + +describe('Server serialization failure handling', () => { + let client: ServiceClient; + let server: Server; + + before((done) => { + function serializeFail(obj: any) { + throw new Error('Serialization failed'); + } + + const malformedTestService = { + unary: { + path: '/TestService/Unary', + requestStream: false, + responseStream: false, + requestDeserialize: identity, + responseSerialize: serializeFail + }, + clientStream: { + path: '/TestService/ClientStream', + requestStream: true, + responseStream: false, + requestDeserialize: identity, + responseSerialize: serializeFail + }, + serverStream: { + path: '/TestService/ServerStream', + requestStream: false, + responseStream: true, + requestDeserialize: identity, + responseSerialize: serializeFail + }, + bidiStream: { + path: '/TestService/BidiStream', + requestStream: true, + responseStream: true, + requestDeserialize: identity, + responseSerialize: serializeFail + } + }; + + server = new Server(); + server.addService(malformedTestService as any, { + unary(call: ServerUnaryCall, cb: sendUnaryData) { + cb(null, {}); + }, + + clientStream( + stream: ServerReadableStream, cb: sendUnaryData) { + stream.on('data', noop); + stream.on('end', () => { + cb(null, {}); + }); + }, + + serverStream(stream: ServerWritableStream) { + stream.write({}); + stream.end(); + }, + + bidiStream(stream: ServerDuplexStream) { + stream.on('data', () => { + // Ignore requests + stream.write({}); + }); + stream.on('end', () => { + stream.end(); + }); + } + }); + + server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => { + assert.ifError(err); + client = new testServiceClient(`localhost:${port}`, clientInsecureCreds); + server.start(); + done(); + }); + }); + + after((done) => { + client.close(); + server.tryShutdown(done); + }); + + it('should get an INTERNAL status with a unary call', (done) => { + client.unary({}, (err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + }); + + it('should get an INTERNAL status with a server stream call', (done) => { + const call = client.serverStream({}); + + call.on('data', noop); + call.on('error', (err: ServiceError) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + }); +}); + + +describe('Other conditions', () => { + let client: ServiceClient; + let server: Server; + let port: number; + + before((done) => { + const trailerMetadata = new grpc.Metadata(); + + server = new Server(); + trailerMetadata.add('trailer-present', 'yes'); + + server.addService(testServiceClient.service, { + unary(call: ServerUnaryCall, cb: sendUnaryData) { + const req = call.request; + + if (req.error) { + const details = req.message || 'Requested error'; + + cb({code: grpc.status.UNKNOWN, details} as ServiceError, null, + trailerMetadata); + } else { + cb(null, {count: 1}, trailerMetadata); + } + }, + + clientStream( + stream: ServerReadableStream, cb: sendUnaryData) { + let count = 0; + let errored = false; + + stream.on('data', (data: any) => { + if (data.error) { + const message = data.message || 'Requested error'; + errored = true; + cb(new Error(message) as ServiceError, null, trailerMetadata); + } else { + count++; + } + }); + + stream.on('end', () => { + if (!errored) { + cb(null, {count}, trailerMetadata); + } + }); + }, + + serverStream(stream: ServerWritableStream) { + const req = stream.request; + + if (req.error) { + stream.emit('error', { + code: grpc.status.UNKNOWN, + details: req.message || 'Requested error', + metadata: trailerMetadata + }); + } else { + for (let i = 0; i < 5; i++) { + stream.write({count: i}); + } + + stream.end(trailerMetadata); + } + }, + + bidiStream(stream: ServerDuplexStream) { + let count = 0; + stream.on('data', (data: any) => { + if (data.error) { + const message = data.message || 'Requested error'; + const err = new Error(message) as ServiceError; + + err.metadata = trailerMetadata.clone(); + err.metadata.add('count', '' + count); + stream.emit('error', err); + } else { + stream.write({count}); + count++; + } + }); + + stream.on('end', () => { + stream.end(trailerMetadata); + }); + } + }); + + server.bindAsync('localhost:0', serverInsecureCreds, (err, _port) => { + assert.ifError(err); + port = _port; + client = new testServiceClient(`localhost:${port}`, clientInsecureCreds); + server.start(); + done(); + }); + }); + + after((done) => { + client.close(); + server.tryShutdown(done); + }); + + describe('Server receiving bad input', () => { + let misbehavingClient: ServiceClient; + const badArg = Buffer.from([0xFF]); + + before(() => { + const testServiceAttrs = { + unary: { + path: '/TestService/Unary', + requestStream: false, + responseStream: false, + requestSerialize: identity, + responseDeserialize: identity + }, + clientStream: { + path: '/TestService/ClientStream', + requestStream: true, + responseStream: false, + requestSerialize: identity, + responseDeserialize: identity + }, + serverStream: { + path: '/TestService/ServerStream', + requestStream: false, + responseStream: true, + requestSerialize: identity, + responseDeserialize: identity + }, + bidiStream: { + path: '/TestService/BidiStream', + requestStream: true, + responseStream: true, + requestSerialize: identity, + responseDeserialize: identity + } + } as any; + + const client = + grpc.makeGenericClientConstructor(testServiceAttrs, 'TestService'); + + misbehavingClient = new client(`localhost:${port}`, clientInsecureCreds); + }); + + after(() => { + misbehavingClient.close(); + }); + + it('should respond correctly to a unary call', (done) => { + misbehavingClient.unary(badArg, (err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + }); + + it('should respond correctly to a server stream', (done) => { + const call = misbehavingClient.serverStream(badArg); + + call.on('data', (data: any) => { + assert.fail(data); + }); + + call.on('error', (err: ServiceError) => { + assert(err); + assert.strictEqual(err.code, grpc.status.INTERNAL); + done(); + }); + }); + }); + + describe('Trailing metadata', () => { + it('should be present when a unary call succeeds', (done) => { + let count = 0; + const call = + client.unary({error: false}, (err: ServiceError, data: any) => { + assert.ifError(err); + + count++; + if (count === 2) { + done(); + } + }); + + call.on('status', (status: grpc.StatusObject) => { + assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']); + + count++; + if (count === 2) { + done(); + } + }); + }); + + it('should be present when a unary call fails', (done) => { + let count = 0; + const call = + client.unary({error: true}, (err: ServiceError, data: any) => { + assert(err); + + count++; + if (count === 2) { + done(); + } + }); + + call.on('status', (status: grpc.StatusObject) => { + assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']); + + count++; + if (count === 2) { + done(); + } + }); + }); + + it('should be present when a server stream call succeeds', (done) => { + const call = client.serverStream({error: false}); + + call.on('data', noop); + call.on('status', (status: grpc.StatusObject) => { + assert.strictEqual(status.code, grpc.status.OK); + assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']); + done(); + }); + }); + + it('should be present when a server stream call fails', (done) => { + const call = client.serverStream({error: true}); + + call.on('data', noop); + call.on('error', (error: ServiceError) => { + assert.deepStrictEqual(error.metadata.get('trailer-present'), ['yes']); + done(); + }); + }); + }); + + describe('Error object should contain the status', () => { + it('for a unary call', (done) => { + client.unary({error: true}, (err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNKNOWN); + assert.strictEqual(err.details, 'Requested error'); + done(); + }); + }); + + it('for a server stream call', (done) => { + const call = client.serverStream({error: true}); + + call.on('data', noop); + call.on('error', (error: ServiceError) => { + assert.strictEqual(error.code, grpc.status.UNKNOWN); + assert.strictEqual(error.details, 'Requested error'); + done(); + }); + }); + + it('for a UTF-8 error message', (done) => { + client.unary( + {error: true, message: '測試字符串'}, + (err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNKNOWN); + assert.strictEqual(err.details, '測試字符串'); + done(); + }); + }); + }); +}); + + +function identity(arg: any): any { + return arg; +} + + +function noop(): void {} diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index 0b2f0524..f44f5d5d 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -21,8 +21,12 @@ import * as assert from 'assert'; import * as fs from 'fs'; import * as path from 'path'; +import * as grpc from '../src'; import {ServerCredentials} from '../src'; +import {ServiceError} from '../src/call'; +import {ServiceClient, ServiceClientConstructor} from '../src/make-client'; import {Server} from '../src/server'; +import {sendUnaryData, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServerWritableStream} from '../src/server-call'; import {loadProtoFile} from './common'; @@ -228,4 +232,158 @@ describe('Server', () => { server.bind('localhost:0', ServerCredentials.createInsecure()); }, /Not implemented. Use bindAsync\(\) instead/); }); + + describe('Default handlers', () => { + let server: Server; + let client: ServiceClient; + + const mathProtoFile = path.join(__dirname, 'fixtures', 'math.proto'); + const mathClient = (loadProtoFile(mathProtoFile).math as any).Math; + const mathServiceAttrs = mathClient.service; + + beforeEach((done) => { + server = new Server(); + server.addService(mathServiceAttrs, {}); + server.bindAsync( + 'localhost:0', ServerCredentials.createInsecure(), (err, port) => { + assert.ifError(err); + client = new mathClient( + `localhost:${port}`, grpc.credentials.createInsecure()); + server.start(); + done(); + }); + }); + + it('should respond to a unary call with UNIMPLEMENTED', (done) => { + client.div( + {divisor: 4, dividend: 3}, (error: ServiceError, response: any) => { + assert(error); + assert.strictEqual(error.code, grpc.status.UNIMPLEMENTED); + done(); + }); + }); + + it('should respond to a server stream with UNIMPLEMENTED', (done) => { + const call = client.fib({limit: 5}); + + call.on('data', (value: any) => { + assert.fail('No messages expected'); + }); + + call.on('error', (err: ServiceError) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + done(); + }); + }); + }); +}); + +describe('Echo service', () => { + let server: Server; + let client: ServiceClient; + + before((done) => { + const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); + const echoService = + loadProtoFile(protoFile).EchoService as ServiceClientConstructor; + + server = new Server(); + server.addService(echoService.service, { + echo(call: ServerUnaryCall, callback: sendUnaryData) { + callback(null, call.request); + } + }); + + server.bindAsync( + 'localhost:0', ServerCredentials.createInsecure(), (err, port) => { + assert.ifError(err); + client = new echoService( + `localhost:${port}`, grpc.credentials.createInsecure()); + server.start(); + done(); + }); + }); + + after((done) => { + client.close(); + server.tryShutdown(done); + }); + + it('should echo the recieved message directly', (done) => { + client.echo( + {value: 'test value', value2: 3}, + (error: ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, {value: 'test value', value2: 3}); + done(); + }); + }); +}); + +describe('Generic client and server', () => { + function toString(val: any) { + return val.toString(); + } + + function toBuffer(str: string) { + return Buffer.from(str); + } + + function capitalize(str: string) { + return str.charAt(0).toUpperCase() + str.slice(1); + } + + const stringServiceAttrs = { + capitalize: { + path: '/string/capitalize', + requestStream: false, + responseStream: false, + requestSerialize: toBuffer, + requestDeserialize: toString, + responseSerialize: toBuffer, + responseDeserialize: toString + } + }; + + describe('String client and server', () => { + let client: ServiceClient; + let server: Server; + + before((done) => { + server = new Server(); + + server.addService(stringServiceAttrs as any, { + capitalize( + call: ServerUnaryCall, callback: sendUnaryData) { + callback(null, capitalize(call.request)); + } + }); + + server.bindAsync( + 'localhost:0', ServerCredentials.createInsecure(), (err, port) => { + assert.ifError(err); + server.start(); + const clientConstr = grpc.makeGenericClientConstructor( + stringServiceAttrs as any, + 'unused_but_lets_appease_typescript_anyway'); + client = new clientConstr( + `localhost:${port}`, grpc.credentials.createInsecure()); + done(); + }); + }); + + after((done) => { + client.close(); + server.tryShutdown(done); + }); + + it('Should respond with a capitalized string', (done) => { + client.capitalize('abc', (err: ServiceError, response: string) => { + assert.ifError(err); + assert.strictEqual(response, 'Abc'); + done(); + }); + }); + }); });