mirror of
https://github.com/grpc/grpc-node.git
synced 2025-12-08 18:23:54 +00:00
grpc-js: support unary and server streaming rpcs
This commit adds support for unary and server streaming RPCs.
This commit is contained in:
parent
fe82be7f34
commit
79544366be
@ -16,62 +16,74 @@
|
||||
*/
|
||||
|
||||
import {EventEmitter} from 'events';
|
||||
import * as http2 from 'http2';
|
||||
import {Duplex, Readable, Writable} from 'stream';
|
||||
|
||||
import {ServiceError} from './call';
|
||||
import {Status} from './constants';
|
||||
import {Deserialize, Serialize} from './make-client';
|
||||
import {Metadata} from './metadata';
|
||||
|
||||
function noop(): void {}
|
||||
|
||||
export class ServerUnaryCall<RequestType> extends EventEmitter {
|
||||
cancelled: boolean;
|
||||
request: RequestType|null;
|
||||
export type PartialServiceError = Partial<ServiceError>;
|
||||
|
||||
constructor(private call: ServerCall, public metadata: Metadata) {
|
||||
super();
|
||||
this.cancelled = false;
|
||||
this.request = null; // TODO(cjihrig): Read the unary request here.
|
||||
}
|
||||
type DeadlineUnitIndexSignature = {
|
||||
[name: string]: number
|
||||
};
|
||||
|
||||
getPeer(): string {
|
||||
throw new Error('not implemented yet');
|
||||
}
|
||||
|
||||
sendMetadata(responseMetadata: Metadata): void {
|
||||
throw new Error('not implemented yet');
|
||||
}
|
||||
}
|
||||
const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding';
|
||||
const GRPC_ENCODING_HEADER = 'grpc-encoding';
|
||||
const GRPC_MESSAGE_HEADER = 'grpc-message';
|
||||
const GRPC_STATUS_HEADER = 'grpc-status';
|
||||
const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
|
||||
const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
|
||||
const deadlineUnitsToMs: DeadlineUnitIndexSignature = {
|
||||
H: 3600000,
|
||||
M: 60000,
|
||||
S: 1000,
|
||||
m: 1,
|
||||
u: 0.001,
|
||||
n: 0.000001
|
||||
};
|
||||
const defaultResponseHeaders = {
|
||||
// TODO(cjihrig): Remove these encoding headers from the default response
|
||||
// once compression is integrated.
|
||||
[GRPC_ACCEPT_ENCODING_HEADER]: 'identity',
|
||||
[GRPC_ENCODING_HEADER]: 'identity',
|
||||
[http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
|
||||
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto'
|
||||
};
|
||||
const defaultResponseOptions = {
|
||||
waitForTrailers: true
|
||||
} as http2.ServerStreamResponseOptions;
|
||||
|
||||
|
||||
export class ServerReadableStream<RequestType> extends Readable {
|
||||
cancelled: boolean;
|
||||
export type ServerSurfaceCall = {
|
||||
cancelled: boolean; getPeer(): string;
|
||||
sendMetadata(responseMetadata: Metadata): void
|
||||
};
|
||||
|
||||
constructor(
|
||||
private call: ServerCall, public metadata: Metadata,
|
||||
private deserialize: Deserialize<RequestType>) {
|
||||
super();
|
||||
this.cancelled = false;
|
||||
}
|
||||
export type ServerUnaryCall<RequestType, ResponseType> =
|
||||
ServerSurfaceCall&{request: RequestType | null};
|
||||
export type ServerReadableStream<RequestType, ResponseType> =
|
||||
ServerSurfaceCall&Readable;
|
||||
export type ServerWritableStream<RequestType, ResponseType> =
|
||||
ServerSurfaceCall&Writable&{request: RequestType | null};
|
||||
export type ServerDuplexStream<RequestType, ResponseType> =
|
||||
ServerSurfaceCall&Duplex;
|
||||
|
||||
getPeer(): string {
|
||||
throw new Error('not implemented yet');
|
||||
}
|
||||
|
||||
sendMetadata(responseMetadata: Metadata): void {
|
||||
throw new Error('not implemented yet');
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
export class ServerWritableStream<RequestType, ResponseType> extends Writable {
|
||||
export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter
|
||||
implements ServerUnaryCall<RequestType, ResponseType> {
|
||||
cancelled: boolean;
|
||||
request: RequestType|null;
|
||||
|
||||
constructor(
|
||||
private call: ServerCall, public metadata: Metadata,
|
||||
private serialize: Serialize<ResponseType>) {
|
||||
private call: Http2ServerCallStream<RequestType, ResponseType>,
|
||||
public metadata: Metadata) {
|
||||
super();
|
||||
this.cancelled = false;
|
||||
this.request = null; // TODO(cjihrig): Read the unary request here.
|
||||
this.request = null;
|
||||
}
|
||||
|
||||
getPeer(): string {
|
||||
@ -79,18 +91,109 @@ export class ServerWritableStream<RequestType, ResponseType> extends Writable {
|
||||
}
|
||||
|
||||
sendMetadata(responseMetadata: Metadata): void {
|
||||
throw new Error('not implemented yet');
|
||||
this.call.sendMetadata(responseMetadata);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
export class ServerDuplexStream<RequestType, ResponseType> extends Duplex {
|
||||
export class ServerReadableStreamImpl<RequestType, ResponseType> extends
|
||||
Readable implements ServerReadableStream<RequestType, ResponseType> {
|
||||
cancelled: boolean;
|
||||
private done = false;
|
||||
|
||||
constructor(
|
||||
private call: Http2ServerCallStream<RequestType, ResponseType>,
|
||||
public metadata: Metadata,
|
||||
private _deserialize: Deserialize<RequestType>) {
|
||||
super();
|
||||
this.cancelled = false;
|
||||
}
|
||||
|
||||
getPeer(): string {
|
||||
throw new Error('not implemented yet');
|
||||
}
|
||||
|
||||
sendMetadata(responseMetadata: Metadata): void {
|
||||
this.call.sendMetadata(responseMetadata);
|
||||
}
|
||||
|
||||
_done(): void {
|
||||
this.done = true;
|
||||
this.on('data', noop);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
export class ServerWritableStreamImpl<RequestType, ResponseType> extends
|
||||
Writable implements ServerWritableStream<RequestType, ResponseType> {
|
||||
cancelled: boolean;
|
||||
request: RequestType|null;
|
||||
|
||||
constructor(
|
||||
private call: Http2ServerCallStream<RequestType, ResponseType>,
|
||||
public metadata: Metadata, private _serialize: Serialize<ResponseType>) {
|
||||
super({objectMode: true});
|
||||
this.cancelled = false;
|
||||
this.request = null;
|
||||
|
||||
this.on('error', (err) => {
|
||||
this.call.sendError(err as ServiceError);
|
||||
this.end();
|
||||
});
|
||||
}
|
||||
|
||||
getPeer(): string {
|
||||
throw new Error('not implemented yet');
|
||||
}
|
||||
|
||||
sendMetadata(responseMetadata: Metadata): void {
|
||||
this.call.sendMetadata(responseMetadata);
|
||||
}
|
||||
|
||||
async _write(chunk: ResponseType, encoding: string, callback: Function) {
|
||||
try {
|
||||
const response = await this.call.serializeMessage(chunk);
|
||||
this.call.write(response);
|
||||
} catch (err) {
|
||||
err.code = Status.INTERNAL;
|
||||
this.emit('error', err);
|
||||
}
|
||||
|
||||
callback(null);
|
||||
}
|
||||
|
||||
_final(callback: Function): void {
|
||||
this.call.end();
|
||||
callback(null);
|
||||
}
|
||||
|
||||
// tslint:disable-next-line:no-any
|
||||
end(metadata?: any) {
|
||||
if (metadata) {
|
||||
this.call.status.metadata = metadata;
|
||||
}
|
||||
|
||||
super.end();
|
||||
}
|
||||
|
||||
serialize(input: ResponseType): Buffer|null {
|
||||
if (input === null || input === undefined) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return this._serialize(input);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
|
||||
implements ServerDuplexStream<RequestType, ResponseType> {
|
||||
cancelled: boolean;
|
||||
|
||||
constructor(
|
||||
private call: ServerCall, public metadata: Metadata,
|
||||
private serialize: Serialize<ResponseType>,
|
||||
private deserialize: Deserialize<RequestType>) {
|
||||
private call: Http2ServerCallStream<RequestType, ResponseType>,
|
||||
public metadata: Metadata, private _serialize: Serialize<ResponseType>,
|
||||
private _deserialize: Deserialize<RequestType>) {
|
||||
super();
|
||||
this.cancelled = false;
|
||||
}
|
||||
@ -100,15 +203,11 @@ export class ServerDuplexStream<RequestType, ResponseType> extends Duplex {
|
||||
}
|
||||
|
||||
sendMetadata(responseMetadata: Metadata): void {
|
||||
throw new Error('not implemented yet');
|
||||
this.call.sendMetadata(responseMetadata);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Internal class that wraps the HTTP2 request.
|
||||
export class ServerCall {}
|
||||
|
||||
|
||||
// Unary response callback signature.
|
||||
export type sendUnaryData<ResponseType> =
|
||||
(error: ServiceError|null, value: ResponseType|null, trailer?: Metadata,
|
||||
@ -116,12 +215,12 @@ export type sendUnaryData<ResponseType> =
|
||||
|
||||
// User provided handler for unary calls.
|
||||
export type handleUnaryCall<RequestType, ResponseType> =
|
||||
(call: ServerUnaryCall<RequestType>,
|
||||
(call: ServerUnaryCall<RequestType, ResponseType>,
|
||||
callback: sendUnaryData<ResponseType>) => void;
|
||||
|
||||
// User provided handler for client streaming calls.
|
||||
export type handleClientStreamingCall<RequestType, ResponseType> =
|
||||
(call: ServerReadableStream<RequestType>,
|
||||
(call: ServerReadableStream<RequestType, ResponseType>,
|
||||
callback: sendUnaryData<ResponseType>) => void;
|
||||
|
||||
// User provided handler for server streaming calls.
|
||||
@ -133,9 +232,9 @@ export type handleBidiStreamingCall<RequestType, ResponseType> =
|
||||
(call: ServerDuplexStream<RequestType, ResponseType>) => void;
|
||||
|
||||
export type HandleCall<RequestType, ResponseType> =
|
||||
handleUnaryCall<RequestType, ResponseType>|
|
||||
handleClientStreamingCall<RequestType, ResponseType>|
|
||||
handleServerStreamingCall<RequestType, ResponseType>|
|
||||
handleUnaryCall<RequestType, ResponseType>&
|
||||
handleClientStreamingCall<RequestType, ResponseType>&
|
||||
handleServerStreamingCall<RequestType, ResponseType>&
|
||||
handleBidiStreamingCall<RequestType, ResponseType>;
|
||||
|
||||
export type Handler<RequestType, ResponseType> = {
|
||||
@ -146,3 +245,213 @@ export type Handler<RequestType, ResponseType> = {
|
||||
};
|
||||
|
||||
export type HandlerType = 'bidi'|'clientStream'|'serverStream'|'unary';
|
||||
|
||||
|
||||
// Internal class that wraps the HTTP2 request.
|
||||
export class Http2ServerCallStream<RequestType, ResponseType> extends
|
||||
EventEmitter {
|
||||
cancelled = false;
|
||||
deadline: NodeJS.Timer|null = null;
|
||||
status: PartialServiceError = {code: Status.OK, details: 'OK'};
|
||||
|
||||
constructor(
|
||||
private stream: http2.ServerHttp2Stream,
|
||||
private handler: Handler<RequestType, ResponseType>|null) {
|
||||
super();
|
||||
|
||||
this.stream.once('error', (err: Error) => {
|
||||
this.sendError(err as ServiceError, Status.INTERNAL);
|
||||
});
|
||||
|
||||
this.stream.once('close', () => {
|
||||
if (this.stream.rstCode === http2.constants.NGHTTP2_CANCEL) {
|
||||
this.cancelled = true;
|
||||
this.emit('cancelled', 'cancelled');
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private get _metadataSent(): boolean {
|
||||
return this.stream.headersSent;
|
||||
}
|
||||
|
||||
sendMetadata(customMetadata?: Metadata) {
|
||||
if (this._metadataSent) {
|
||||
return;
|
||||
}
|
||||
|
||||
const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
|
||||
// TODO(cjihrig): Include compression headers.
|
||||
const headers = Object.assign(defaultResponseHeaders, custom);
|
||||
|
||||
this.stream.respond(headers, defaultResponseOptions);
|
||||
this.stream.once('wantTrailers', () => {
|
||||
let trailersToSend = {
|
||||
[GRPC_STATUS_HEADER]: this.status.code,
|
||||
[GRPC_MESSAGE_HEADER]: encodeURI(this.status.details as string)
|
||||
};
|
||||
const metadata = this.status.metadata;
|
||||
|
||||
if (metadata) {
|
||||
trailersToSend =
|
||||
Object.assign(trailersToSend, metadata.toHttp2Headers());
|
||||
}
|
||||
|
||||
this.stream.sendTrailers(trailersToSend);
|
||||
});
|
||||
}
|
||||
|
||||
receiveMetadata(headers: http2.IncomingHttpHeaders) {
|
||||
const metadata = Metadata.fromHttp2Headers(headers);
|
||||
|
||||
// TODO(cjihrig): Receive compression metadata.
|
||||
|
||||
const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
|
||||
|
||||
if (timeoutHeader.length > 0) {
|
||||
const match = timeoutHeader[0].toString().match(DEADLINE_REGEX);
|
||||
|
||||
if (match === null) {
|
||||
this.sendError(
|
||||
new Error('Invalid deadline') as ServiceError, Status.OUT_OF_RANGE);
|
||||
return;
|
||||
}
|
||||
|
||||
const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
|
||||
|
||||
this.deadline = setTimeout(handleExpiredDeadline, timeout, this);
|
||||
metadata.remove(GRPC_TIMEOUT_HEADER);
|
||||
}
|
||||
|
||||
return metadata;
|
||||
}
|
||||
|
||||
receiveUnaryMessage(): Promise<RequestType> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const stream = this.stream;
|
||||
const chunks: Buffer[] = [];
|
||||
let totalLength = 0;
|
||||
|
||||
stream.on('data', (data: Buffer) => {
|
||||
chunks.push(data);
|
||||
totalLength += data.byteLength;
|
||||
});
|
||||
|
||||
stream.once('end', async () => {
|
||||
try {
|
||||
const requestBytes = Buffer.concat(chunks, totalLength);
|
||||
|
||||
resolve(await this.deserializeMessage(requestBytes));
|
||||
} catch (err) {
|
||||
this.sendError(err, Status.INTERNAL);
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
serializeMessage(value: ResponseType) {
|
||||
const handler = this.handler as Handler<RequestType, ResponseType>;
|
||||
const messageBuffer = handler.serialize(value);
|
||||
|
||||
// TODO(cjihrig): Call compression aware serializeMessage().
|
||||
const byteLength = messageBuffer.byteLength;
|
||||
const output = Buffer.allocUnsafe(byteLength + 5);
|
||||
output.writeUInt8(0, 0);
|
||||
output.writeUInt32BE(byteLength, 1);
|
||||
messageBuffer.copy(output, 5);
|
||||
return output;
|
||||
}
|
||||
|
||||
async deserializeMessage(bytes: Buffer) {
|
||||
const handler = this.handler as Handler<RequestType, ResponseType>;
|
||||
// TODO(cjihrig): Call compression aware deserializeMessage().
|
||||
const receivedMessage = bytes.slice(5);
|
||||
|
||||
return handler.deserialize(receivedMessage);
|
||||
}
|
||||
|
||||
async sendUnaryMessage(
|
||||
err: ServiceError|null, value: ResponseType|null, metadata?: Metadata,
|
||||
flags?: number) {
|
||||
if (err) {
|
||||
if (metadata) {
|
||||
err.metadata = metadata;
|
||||
}
|
||||
|
||||
this.sendError(err);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await this.serializeMessage(value as ResponseType);
|
||||
|
||||
if (metadata) {
|
||||
this.status.metadata = metadata;
|
||||
}
|
||||
|
||||
this.end(response);
|
||||
} catch (err) {
|
||||
this.sendError(err, Status.INTERNAL);
|
||||
}
|
||||
}
|
||||
|
||||
sendError(error: ServiceError, code = Status.UNKNOWN) {
|
||||
const {status} = this;
|
||||
|
||||
if (error.hasOwnProperty('message')) {
|
||||
status.details = error.message;
|
||||
} else {
|
||||
status.details = 'Unknown Error';
|
||||
}
|
||||
|
||||
if (error.hasOwnProperty('code') && Number.isInteger(error.code)) {
|
||||
status.code = error.code;
|
||||
|
||||
if (error.hasOwnProperty('details')) {
|
||||
status.details = error.details;
|
||||
}
|
||||
} else {
|
||||
status.code = code;
|
||||
}
|
||||
|
||||
if (error.hasOwnProperty('metadata')) {
|
||||
status.metadata = error.metadata;
|
||||
}
|
||||
|
||||
this.end();
|
||||
}
|
||||
|
||||
write(chunk: Buffer) {
|
||||
if (this.cancelled === true) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.sendMetadata();
|
||||
return this.stream.write(chunk);
|
||||
}
|
||||
|
||||
end(payload?: Buffer) {
|
||||
if (this.cancelled === true) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.deadline !== null) {
|
||||
clearTimeout(this.deadline);
|
||||
this.deadline = null;
|
||||
}
|
||||
|
||||
this.sendMetadata();
|
||||
return this.stream.end(payload);
|
||||
}
|
||||
}
|
||||
|
||||
// tslint:disable:no-any
|
||||
type UntypedServerCall = Http2ServerCallStream<any, any>;
|
||||
|
||||
function handleExpiredDeadline(call: UntypedServerCall) {
|
||||
call.sendError(
|
||||
new Error('Deadline exceeded') as ServiceError, Status.DEADLINE_EXCEEDED);
|
||||
call.cancelled = true;
|
||||
call.emit('cancelled', 'deadline');
|
||||
}
|
||||
|
||||
@ -22,12 +22,12 @@ import {URL} from 'url';
|
||||
import {ServiceError} from './call';
|
||||
import {Status} from './constants';
|
||||
import {Deserialize, Serialize, ServiceDefinition} from './make-client';
|
||||
import {HandleCall, Handler, HandlerType, sendUnaryData, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServerWritableStream} from './server-call';
|
||||
import {Metadata} from './metadata';
|
||||
import {HandleCall, Handler, HandlerType, Http2ServerCallStream, PartialServiceError, sendUnaryData, ServerDuplexStream, ServerDuplexStreamImpl, ServerReadableStream, ServerReadableStreamImpl, ServerUnaryCall, ServerUnaryCallImpl, ServerWritableStream, ServerWritableStreamImpl} from './server-call';
|
||||
import {ServerCredentials} from './server-credentials';
|
||||
|
||||
function noop(): void {}
|
||||
|
||||
type PartialServiceError = Partial<ServiceError>;
|
||||
const unimplementedStatusResponse: PartialServiceError = {
|
||||
code: Status.UNIMPLEMENTED,
|
||||
details: 'The server does not implement this method',
|
||||
@ -41,10 +41,11 @@ type UntypedServiceImplementation = {
|
||||
};
|
||||
|
||||
const defaultHandler = {
|
||||
unary(call: ServerUnaryCall<any>, callback: sendUnaryData<any>): void {
|
||||
unary(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>): void {
|
||||
callback(unimplementedStatusResponse as ServiceError, null);
|
||||
},
|
||||
clientStream(call: ServerReadableStream<any>, callback: sendUnaryData<any>):
|
||||
clientStream(
|
||||
call: ServerReadableStream<any, any>, callback: sendUnaryData<any>):
|
||||
void {
|
||||
callback(unimplementedStatusResponse as ServiceError, null);
|
||||
},
|
||||
@ -120,8 +121,8 @@ export class Server {
|
||||
}
|
||||
|
||||
const success = this.register(
|
||||
attrs.path, impl, attrs.responseSerialize, attrs.requestDeserialize,
|
||||
methodType);
|
||||
attrs.path, impl as UntypedHandleCall, attrs.responseSerialize,
|
||||
attrs.requestDeserialize, methodType);
|
||||
|
||||
if (success === false) {
|
||||
throw new Error(`Method handler for ${attrs.path} already provided.`);
|
||||
@ -162,7 +163,7 @@ export class Server {
|
||||
this.http2Server = http2.createServer();
|
||||
}
|
||||
|
||||
// TODO(cjihrig): Set up the handlers, to allow requests to be processed.
|
||||
this._setupHandlers();
|
||||
|
||||
function onError(err: Error): void {
|
||||
callback(err, -1);
|
||||
@ -227,4 +228,107 @@ export class Server {
|
||||
addHttp2Port(): void {
|
||||
throw new Error('Not yet implemented');
|
||||
}
|
||||
|
||||
private _setupHandlers(): void {
|
||||
if (this.http2Server === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.http2Server.on(
|
||||
'stream',
|
||||
(stream: http2.ServerHttp2Stream,
|
||||
headers: http2.IncomingHttpHeaders) => {
|
||||
if (this.started !== true) {
|
||||
stream.end();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const path = headers[http2.constants.HTTP2_HEADER_PATH] as string;
|
||||
const handler = this.handlers.get(path);
|
||||
|
||||
if (handler === undefined) {
|
||||
throw unimplementedStatusResponse;
|
||||
}
|
||||
|
||||
const call = new Http2ServerCallStream(stream, handler);
|
||||
const metadata: Metadata =
|
||||
call.receiveMetadata(headers) as Metadata;
|
||||
|
||||
switch (handler.type) {
|
||||
case 'unary':
|
||||
handleUnary(call, handler, metadata);
|
||||
break;
|
||||
case 'clientStream':
|
||||
handleClientStreaming(call, handler, metadata);
|
||||
break;
|
||||
case 'serverStream':
|
||||
handleServerStreaming(call, handler, metadata);
|
||||
break;
|
||||
case 'bidi':
|
||||
handleBidiStreaming(call, handler, metadata);
|
||||
break;
|
||||
default:
|
||||
throw new Error(`Unknown handler type: ${handler.type}`);
|
||||
}
|
||||
} catch (err) {
|
||||
const call = new Http2ServerCallStream(stream, null);
|
||||
call.sendError(err, Status.INTERNAL);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async function handleUnary<RequestType, ResponseType>(
|
||||
call: Http2ServerCallStream<RequestType, ResponseType>,
|
||||
handler: Handler<RequestType, ResponseType>,
|
||||
metadata: Metadata): Promise<void> {
|
||||
const emitter =
|
||||
new ServerUnaryCallImpl<RequestType, ResponseType>(call, metadata);
|
||||
const request = await call.receiveUnaryMessage();
|
||||
|
||||
if (request === undefined || call.cancelled === true) {
|
||||
return;
|
||||
}
|
||||
|
||||
emitter.request = request;
|
||||
handler.func(
|
||||
emitter,
|
||||
(err: ServiceError|null, value: ResponseType|null, trailer?: Metadata,
|
||||
flags?: number) => {
|
||||
call.sendUnaryMessage(err, value, trailer, flags);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
function handleClientStreaming<RequestType, ResponseType>(
|
||||
call: Http2ServerCallStream<RequestType, ResponseType>,
|
||||
handler: Handler<RequestType, ResponseType>, metadata: Metadata): void {
|
||||
throw new Error('not implemented yet');
|
||||
}
|
||||
|
||||
|
||||
async function handleServerStreaming<RequestType, ResponseType>(
|
||||
call: Http2ServerCallStream<RequestType, ResponseType>,
|
||||
handler: Handler<RequestType, ResponseType>,
|
||||
metadata: Metadata): Promise<void> {
|
||||
const request = await call.receiveUnaryMessage();
|
||||
|
||||
if (request === undefined || call.cancelled === true) {
|
||||
return;
|
||||
}
|
||||
|
||||
const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
|
||||
call, metadata, handler.serialize);
|
||||
|
||||
stream.request = request;
|
||||
handler.func(stream);
|
||||
}
|
||||
|
||||
|
||||
function handleBidiStreaming<RequestType, ResponseType>(
|
||||
call: Http2ServerCallStream<RequestType, ResponseType>,
|
||||
handler: Handler<RequestType, ResponseType>, metadata: Metadata): void {
|
||||
throw new Error('not implemented yet');
|
||||
}
|
||||
|
||||
33
packages/grpc-js/test/fixtures/echo_service.proto
vendored
Normal file
33
packages/grpc-js/test/fixtures/echo_service.proto
vendored
Normal file
@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
message EchoMessage {
|
||||
string value = 1;
|
||||
int32 value2 = 2;
|
||||
}
|
||||
|
||||
service EchoService {
|
||||
rpc Echo (EchoMessage) returns (EchoMessage);
|
||||
|
||||
rpc EchoClientStream (stream EchoMessage) returns (EchoMessage);
|
||||
|
||||
rpc EchoServerStream (EchoMessage) returns (stream EchoMessage);
|
||||
|
||||
rpc EchoBidiStream (stream EchoMessage) returns (stream EchoMessage);
|
||||
}
|
||||
41
packages/grpc-js/test/fixtures/test_service.proto
vendored
Normal file
41
packages/grpc-js/test/fixtures/test_service.proto
vendored
Normal file
@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
message Request {
|
||||
bool error = 1;
|
||||
string message = 2;
|
||||
}
|
||||
|
||||
message Response {
|
||||
int32 count = 1;
|
||||
}
|
||||
|
||||
service TestService {
|
||||
rpc Unary (Request) returns (Response) {
|
||||
}
|
||||
|
||||
rpc ClientStream (stream Request) returns (Response) {
|
||||
}
|
||||
|
||||
rpc ServerStream (Request) returns (stream Response) {
|
||||
}
|
||||
|
||||
rpc BidiStream (stream Request) returns (stream Response) {
|
||||
}
|
||||
}
|
||||
522
packages/grpc-js/test/test-server-errors.ts
Normal file
522
packages/grpc-js/test/test-server-errors.ts
Normal file
@ -0,0 +1,522 @@
|
||||
/*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
// Allow `any` data type for testing runtime type checking.
|
||||
// tslint:disable no-any
|
||||
import * as assert from 'assert';
|
||||
import {join} from 'path';
|
||||
|
||||
import * as grpc from '../src';
|
||||
import {ServiceError} from '../src/call';
|
||||
import {ServiceClient, ServiceClientConstructor} from '../src/make-client';
|
||||
import {Server} from '../src/server';
|
||||
import {sendUnaryData, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServerWritableStream} from '../src/server-call';
|
||||
|
||||
import {loadProtoFile} from './common';
|
||||
|
||||
const protoFile = join(__dirname, 'fixtures', 'test_service.proto');
|
||||
const testServiceDef = loadProtoFile(protoFile);
|
||||
const testServiceClient =
|
||||
testServiceDef.TestService as ServiceClientConstructor;
|
||||
const clientInsecureCreds = grpc.credentials.createInsecure();
|
||||
const serverInsecureCreds = grpc.ServerCredentials.createInsecure();
|
||||
|
||||
|
||||
describe('Client malformed response handling', () => {
|
||||
let server: Server;
|
||||
let client: ServiceClient;
|
||||
const badArg = Buffer.from([0xFF]);
|
||||
|
||||
before((done) => {
|
||||
const malformedTestService = {
|
||||
unary: {
|
||||
path: '/TestService/Unary',
|
||||
requestStream: false,
|
||||
responseStream: false,
|
||||
requestDeserialize: identity,
|
||||
responseSerialize: identity
|
||||
},
|
||||
clientStream: {
|
||||
path: '/TestService/ClientStream',
|
||||
requestStream: true,
|
||||
responseStream: false,
|
||||
requestDeserialize: identity,
|
||||
responseSerialize: identity
|
||||
},
|
||||
serverStream: {
|
||||
path: '/TestService/ServerStream',
|
||||
requestStream: false,
|
||||
responseStream: true,
|
||||
requestDeserialize: identity,
|
||||
responseSerialize: identity
|
||||
},
|
||||
bidiStream: {
|
||||
path: '/TestService/BidiStream',
|
||||
requestStream: true,
|
||||
responseStream: true,
|
||||
requestDeserialize: identity,
|
||||
responseSerialize: identity
|
||||
}
|
||||
} as any;
|
||||
|
||||
server = new Server();
|
||||
|
||||
server.addService(malformedTestService, {
|
||||
unary(call: ServerUnaryCall<any, any>, cb: sendUnaryData<any>) {
|
||||
cb(null, badArg);
|
||||
},
|
||||
|
||||
clientStream(
|
||||
stream: ServerReadableStream<any, any>, cb: sendUnaryData<any>) {
|
||||
stream.on('data', noop);
|
||||
stream.on('end', () => {
|
||||
cb(null, badArg);
|
||||
});
|
||||
},
|
||||
|
||||
serverStream(stream: ServerWritableStream<any, any>) {
|
||||
stream.write(badArg);
|
||||
stream.end();
|
||||
},
|
||||
|
||||
bidiStream(stream: ServerDuplexStream<any, any>) {
|
||||
stream.on('data', () => {
|
||||
// Ignore requests
|
||||
stream.write(badArg);
|
||||
});
|
||||
|
||||
stream.on('end', () => {
|
||||
stream.end();
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => {
|
||||
assert.ifError(err);
|
||||
client = new testServiceClient(`localhost:${port}`, clientInsecureCreds);
|
||||
server.start();
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
after((done) => {
|
||||
client.close();
|
||||
server.tryShutdown(done);
|
||||
});
|
||||
|
||||
it('should get an INTERNAL status with a unary call', (done) => {
|
||||
client.unary({}, (err: ServiceError, data: any) => {
|
||||
assert(err);
|
||||
assert.strictEqual(err.code, grpc.status.INTERNAL);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get an INTERNAL status with a server stream call', (done) => {
|
||||
const call = client.serverStream({});
|
||||
|
||||
call.on('data', noop);
|
||||
call.on('error', (err: ServiceError) => {
|
||||
assert(err);
|
||||
assert.strictEqual(err.code, grpc.status.INTERNAL);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('Server serialization failure handling', () => {
|
||||
let client: ServiceClient;
|
||||
let server: Server;
|
||||
|
||||
before((done) => {
|
||||
function serializeFail(obj: any) {
|
||||
throw new Error('Serialization failed');
|
||||
}
|
||||
|
||||
const malformedTestService = {
|
||||
unary: {
|
||||
path: '/TestService/Unary',
|
||||
requestStream: false,
|
||||
responseStream: false,
|
||||
requestDeserialize: identity,
|
||||
responseSerialize: serializeFail
|
||||
},
|
||||
clientStream: {
|
||||
path: '/TestService/ClientStream',
|
||||
requestStream: true,
|
||||
responseStream: false,
|
||||
requestDeserialize: identity,
|
||||
responseSerialize: serializeFail
|
||||
},
|
||||
serverStream: {
|
||||
path: '/TestService/ServerStream',
|
||||
requestStream: false,
|
||||
responseStream: true,
|
||||
requestDeserialize: identity,
|
||||
responseSerialize: serializeFail
|
||||
},
|
||||
bidiStream: {
|
||||
path: '/TestService/BidiStream',
|
||||
requestStream: true,
|
||||
responseStream: true,
|
||||
requestDeserialize: identity,
|
||||
responseSerialize: serializeFail
|
||||
}
|
||||
};
|
||||
|
||||
server = new Server();
|
||||
server.addService(malformedTestService as any, {
|
||||
unary(call: ServerUnaryCall<any, any>, cb: sendUnaryData<any>) {
|
||||
cb(null, {});
|
||||
},
|
||||
|
||||
clientStream(
|
||||
stream: ServerReadableStream<any, any>, cb: sendUnaryData<any>) {
|
||||
stream.on('data', noop);
|
||||
stream.on('end', () => {
|
||||
cb(null, {});
|
||||
});
|
||||
},
|
||||
|
||||
serverStream(stream: ServerWritableStream<any, any>) {
|
||||
stream.write({});
|
||||
stream.end();
|
||||
},
|
||||
|
||||
bidiStream(stream: ServerDuplexStream<any, any>) {
|
||||
stream.on('data', () => {
|
||||
// Ignore requests
|
||||
stream.write({});
|
||||
});
|
||||
stream.on('end', () => {
|
||||
stream.end();
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => {
|
||||
assert.ifError(err);
|
||||
client = new testServiceClient(`localhost:${port}`, clientInsecureCreds);
|
||||
server.start();
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
after((done) => {
|
||||
client.close();
|
||||
server.tryShutdown(done);
|
||||
});
|
||||
|
||||
it('should get an INTERNAL status with a unary call', (done) => {
|
||||
client.unary({}, (err: ServiceError, data: any) => {
|
||||
assert(err);
|
||||
assert.strictEqual(err.code, grpc.status.INTERNAL);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should get an INTERNAL status with a server stream call', (done) => {
|
||||
const call = client.serverStream({});
|
||||
|
||||
call.on('data', noop);
|
||||
call.on('error', (err: ServiceError) => {
|
||||
assert(err);
|
||||
assert.strictEqual(err.code, grpc.status.INTERNAL);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
describe('Other conditions', () => {
|
||||
let client: ServiceClient;
|
||||
let server: Server;
|
||||
let port: number;
|
||||
|
||||
before((done) => {
|
||||
const trailerMetadata = new grpc.Metadata();
|
||||
|
||||
server = new Server();
|
||||
trailerMetadata.add('trailer-present', 'yes');
|
||||
|
||||
server.addService(testServiceClient.service, {
|
||||
unary(call: ServerUnaryCall<any, any>, cb: sendUnaryData<any>) {
|
||||
const req = call.request;
|
||||
|
||||
if (req.error) {
|
||||
const details = req.message || 'Requested error';
|
||||
|
||||
cb({code: grpc.status.UNKNOWN, details} as ServiceError, null,
|
||||
trailerMetadata);
|
||||
} else {
|
||||
cb(null, {count: 1}, trailerMetadata);
|
||||
}
|
||||
},
|
||||
|
||||
clientStream(
|
||||
stream: ServerReadableStream<any, any>, cb: sendUnaryData<any>) {
|
||||
let count = 0;
|
||||
let errored = false;
|
||||
|
||||
stream.on('data', (data: any) => {
|
||||
if (data.error) {
|
||||
const message = data.message || 'Requested error';
|
||||
errored = true;
|
||||
cb(new Error(message) as ServiceError, null, trailerMetadata);
|
||||
} else {
|
||||
count++;
|
||||
}
|
||||
});
|
||||
|
||||
stream.on('end', () => {
|
||||
if (!errored) {
|
||||
cb(null, {count}, trailerMetadata);
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
serverStream(stream: ServerWritableStream<any, any>) {
|
||||
const req = stream.request;
|
||||
|
||||
if (req.error) {
|
||||
stream.emit('error', {
|
||||
code: grpc.status.UNKNOWN,
|
||||
details: req.message || 'Requested error',
|
||||
metadata: trailerMetadata
|
||||
});
|
||||
} else {
|
||||
for (let i = 0; i < 5; i++) {
|
||||
stream.write({count: i});
|
||||
}
|
||||
|
||||
stream.end(trailerMetadata);
|
||||
}
|
||||
},
|
||||
|
||||
bidiStream(stream: ServerDuplexStream<any, any>) {
|
||||
let count = 0;
|
||||
stream.on('data', (data: any) => {
|
||||
if (data.error) {
|
||||
const message = data.message || 'Requested error';
|
||||
const err = new Error(message) as ServiceError;
|
||||
|
||||
err.metadata = trailerMetadata.clone();
|
||||
err.metadata.add('count', '' + count);
|
||||
stream.emit('error', err);
|
||||
} else {
|
||||
stream.write({count});
|
||||
count++;
|
||||
}
|
||||
});
|
||||
|
||||
stream.on('end', () => {
|
||||
stream.end(trailerMetadata);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
server.bindAsync('localhost:0', serverInsecureCreds, (err, _port) => {
|
||||
assert.ifError(err);
|
||||
port = _port;
|
||||
client = new testServiceClient(`localhost:${port}`, clientInsecureCreds);
|
||||
server.start();
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
after((done) => {
|
||||
client.close();
|
||||
server.tryShutdown(done);
|
||||
});
|
||||
|
||||
describe('Server receiving bad input', () => {
|
||||
let misbehavingClient: ServiceClient;
|
||||
const badArg = Buffer.from([0xFF]);
|
||||
|
||||
before(() => {
|
||||
const testServiceAttrs = {
|
||||
unary: {
|
||||
path: '/TestService/Unary',
|
||||
requestStream: false,
|
||||
responseStream: false,
|
||||
requestSerialize: identity,
|
||||
responseDeserialize: identity
|
||||
},
|
||||
clientStream: {
|
||||
path: '/TestService/ClientStream',
|
||||
requestStream: true,
|
||||
responseStream: false,
|
||||
requestSerialize: identity,
|
||||
responseDeserialize: identity
|
||||
},
|
||||
serverStream: {
|
||||
path: '/TestService/ServerStream',
|
||||
requestStream: false,
|
||||
responseStream: true,
|
||||
requestSerialize: identity,
|
||||
responseDeserialize: identity
|
||||
},
|
||||
bidiStream: {
|
||||
path: '/TestService/BidiStream',
|
||||
requestStream: true,
|
||||
responseStream: true,
|
||||
requestSerialize: identity,
|
||||
responseDeserialize: identity
|
||||
}
|
||||
} as any;
|
||||
|
||||
const client =
|
||||
grpc.makeGenericClientConstructor(testServiceAttrs, 'TestService');
|
||||
|
||||
misbehavingClient = new client(`localhost:${port}`, clientInsecureCreds);
|
||||
});
|
||||
|
||||
after(() => {
|
||||
misbehavingClient.close();
|
||||
});
|
||||
|
||||
it('should respond correctly to a unary call', (done) => {
|
||||
misbehavingClient.unary(badArg, (err: ServiceError, data: any) => {
|
||||
assert(err);
|
||||
assert.strictEqual(err.code, grpc.status.INTERNAL);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should respond correctly to a server stream', (done) => {
|
||||
const call = misbehavingClient.serverStream(badArg);
|
||||
|
||||
call.on('data', (data: any) => {
|
||||
assert.fail(data);
|
||||
});
|
||||
|
||||
call.on('error', (err: ServiceError) => {
|
||||
assert(err);
|
||||
assert.strictEqual(err.code, grpc.status.INTERNAL);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('Trailing metadata', () => {
|
||||
it('should be present when a unary call succeeds', (done) => {
|
||||
let count = 0;
|
||||
const call =
|
||||
client.unary({error: false}, (err: ServiceError, data: any) => {
|
||||
assert.ifError(err);
|
||||
|
||||
count++;
|
||||
if (count === 2) {
|
||||
done();
|
||||
}
|
||||
});
|
||||
|
||||
call.on('status', (status: grpc.StatusObject) => {
|
||||
assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);
|
||||
|
||||
count++;
|
||||
if (count === 2) {
|
||||
done();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it('should be present when a unary call fails', (done) => {
|
||||
let count = 0;
|
||||
const call =
|
||||
client.unary({error: true}, (err: ServiceError, data: any) => {
|
||||
assert(err);
|
||||
|
||||
count++;
|
||||
if (count === 2) {
|
||||
done();
|
||||
}
|
||||
});
|
||||
|
||||
call.on('status', (status: grpc.StatusObject) => {
|
||||
assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);
|
||||
|
||||
count++;
|
||||
if (count === 2) {
|
||||
done();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it('should be present when a server stream call succeeds', (done) => {
|
||||
const call = client.serverStream({error: false});
|
||||
|
||||
call.on('data', noop);
|
||||
call.on('status', (status: grpc.StatusObject) => {
|
||||
assert.strictEqual(status.code, grpc.status.OK);
|
||||
assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should be present when a server stream call fails', (done) => {
|
||||
const call = client.serverStream({error: true});
|
||||
|
||||
call.on('data', noop);
|
||||
call.on('error', (error: ServiceError) => {
|
||||
assert.deepStrictEqual(error.metadata.get('trailer-present'), ['yes']);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('Error object should contain the status', () => {
|
||||
it('for a unary call', (done) => {
|
||||
client.unary({error: true}, (err: ServiceError, data: any) => {
|
||||
assert(err);
|
||||
assert.strictEqual(err.code, grpc.status.UNKNOWN);
|
||||
assert.strictEqual(err.details, 'Requested error');
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('for a server stream call', (done) => {
|
||||
const call = client.serverStream({error: true});
|
||||
|
||||
call.on('data', noop);
|
||||
call.on('error', (error: ServiceError) => {
|
||||
assert.strictEqual(error.code, grpc.status.UNKNOWN);
|
||||
assert.strictEqual(error.details, 'Requested error');
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('for a UTF-8 error message', (done) => {
|
||||
client.unary(
|
||||
{error: true, message: '測試字符串'},
|
||||
(err: ServiceError, data: any) => {
|
||||
assert(err);
|
||||
assert.strictEqual(err.code, grpc.status.UNKNOWN);
|
||||
assert.strictEqual(err.details, '測試字符串');
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
function identity(arg: any): any {
|
||||
return arg;
|
||||
}
|
||||
|
||||
|
||||
function noop(): void {}
|
||||
@ -21,8 +21,12 @@ import * as assert from 'assert';
|
||||
import * as fs from 'fs';
|
||||
import * as path from 'path';
|
||||
|
||||
import * as grpc from '../src';
|
||||
import {ServerCredentials} from '../src';
|
||||
import {ServiceError} from '../src/call';
|
||||
import {ServiceClient, ServiceClientConstructor} from '../src/make-client';
|
||||
import {Server} from '../src/server';
|
||||
import {sendUnaryData, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServerWritableStream} from '../src/server-call';
|
||||
|
||||
import {loadProtoFile} from './common';
|
||||
|
||||
@ -228,4 +232,158 @@ describe('Server', () => {
|
||||
server.bind('localhost:0', ServerCredentials.createInsecure());
|
||||
}, /Not implemented. Use bindAsync\(\) instead/);
|
||||
});
|
||||
|
||||
describe('Default handlers', () => {
|
||||
let server: Server;
|
||||
let client: ServiceClient;
|
||||
|
||||
const mathProtoFile = path.join(__dirname, 'fixtures', 'math.proto');
|
||||
const mathClient = (loadProtoFile(mathProtoFile).math as any).Math;
|
||||
const mathServiceAttrs = mathClient.service;
|
||||
|
||||
beforeEach((done) => {
|
||||
server = new Server();
|
||||
server.addService(mathServiceAttrs, {});
|
||||
server.bindAsync(
|
||||
'localhost:0', ServerCredentials.createInsecure(), (err, port) => {
|
||||
assert.ifError(err);
|
||||
client = new mathClient(
|
||||
`localhost:${port}`, grpc.credentials.createInsecure());
|
||||
server.start();
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should respond to a unary call with UNIMPLEMENTED', (done) => {
|
||||
client.div(
|
||||
{divisor: 4, dividend: 3}, (error: ServiceError, response: any) => {
|
||||
assert(error);
|
||||
assert.strictEqual(error.code, grpc.status.UNIMPLEMENTED);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should respond to a server stream with UNIMPLEMENTED', (done) => {
|
||||
const call = client.fib({limit: 5});
|
||||
|
||||
call.on('data', (value: any) => {
|
||||
assert.fail('No messages expected');
|
||||
});
|
||||
|
||||
call.on('error', (err: ServiceError) => {
|
||||
assert(err);
|
||||
assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('Echo service', () => {
|
||||
let server: Server;
|
||||
let client: ServiceClient;
|
||||
|
||||
before((done) => {
|
||||
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
|
||||
const echoService =
|
||||
loadProtoFile(protoFile).EchoService as ServiceClientConstructor;
|
||||
|
||||
server = new Server();
|
||||
server.addService(echoService.service, {
|
||||
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
|
||||
callback(null, call.request);
|
||||
}
|
||||
});
|
||||
|
||||
server.bindAsync(
|
||||
'localhost:0', ServerCredentials.createInsecure(), (err, port) => {
|
||||
assert.ifError(err);
|
||||
client = new echoService(
|
||||
`localhost:${port}`, grpc.credentials.createInsecure());
|
||||
server.start();
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
after((done) => {
|
||||
client.close();
|
||||
server.tryShutdown(done);
|
||||
});
|
||||
|
||||
it('should echo the recieved message directly', (done) => {
|
||||
client.echo(
|
||||
{value: 'test value', value2: 3},
|
||||
(error: ServiceError, response: any) => {
|
||||
assert.ifError(error);
|
||||
assert.deepStrictEqual(response, {value: 'test value', value2: 3});
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('Generic client and server', () => {
|
||||
function toString(val: any) {
|
||||
return val.toString();
|
||||
}
|
||||
|
||||
function toBuffer(str: string) {
|
||||
return Buffer.from(str);
|
||||
}
|
||||
|
||||
function capitalize(str: string) {
|
||||
return str.charAt(0).toUpperCase() + str.slice(1);
|
||||
}
|
||||
|
||||
const stringServiceAttrs = {
|
||||
capitalize: {
|
||||
path: '/string/capitalize',
|
||||
requestStream: false,
|
||||
responseStream: false,
|
||||
requestSerialize: toBuffer,
|
||||
requestDeserialize: toString,
|
||||
responseSerialize: toBuffer,
|
||||
responseDeserialize: toString
|
||||
}
|
||||
};
|
||||
|
||||
describe('String client and server', () => {
|
||||
let client: ServiceClient;
|
||||
let server: Server;
|
||||
|
||||
before((done) => {
|
||||
server = new Server();
|
||||
|
||||
server.addService(stringServiceAttrs as any, {
|
||||
capitalize(
|
||||
call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
|
||||
callback(null, capitalize(call.request));
|
||||
}
|
||||
});
|
||||
|
||||
server.bindAsync(
|
||||
'localhost:0', ServerCredentials.createInsecure(), (err, port) => {
|
||||
assert.ifError(err);
|
||||
server.start();
|
||||
const clientConstr = grpc.makeGenericClientConstructor(
|
||||
stringServiceAttrs as any,
|
||||
'unused_but_lets_appease_typescript_anyway');
|
||||
client = new clientConstr(
|
||||
`localhost:${port}`, grpc.credentials.createInsecure());
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
after((done) => {
|
||||
client.close();
|
||||
server.tryShutdown(done);
|
||||
});
|
||||
|
||||
it('Should respond with a capitalized string', (done) => {
|
||||
client.capitalize('abc', (err: ServiceError, response: string) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(response, 'Abc');
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user