Merge pull request #832 from cjihrig/server

grpc-js: support unary and server streaming rpcs
This commit is contained in:
Michael Lumish 2019-05-03 13:30:57 -07:00 committed by GitHub
commit f7f098b117
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1263 additions and 60 deletions

View File

@ -16,62 +16,75 @@
*/
import {EventEmitter} from 'events';
import * as http2 from 'http2';
import {Duplex, Readable, Writable} from 'stream';
import {ServiceError} from './call';
import {StatusObject} from './call-stream';
import {Status} from './constants';
import {Deserialize, Serialize} from './make-client';
import {Metadata} from './metadata';
function noop(): void {}
export class ServerUnaryCall<RequestType> extends EventEmitter {
cancelled: boolean;
request: RequestType|null;
export type PartialServiceError = Partial<ServiceError>;
constructor(private call: ServerCall, public metadata: Metadata) {
super();
this.cancelled = false;
this.request = null; // TODO(cjihrig): Read the unary request here.
}
type DeadlineUnitIndexSignature = {
[name: string]: number
};
getPeer(): string {
throw new Error('not implemented yet');
}
sendMetadata(responseMetadata: Metadata): void {
throw new Error('not implemented yet');
}
}
const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding';
const GRPC_ENCODING_HEADER = 'grpc-encoding';
const GRPC_MESSAGE_HEADER = 'grpc-message';
const GRPC_STATUS_HEADER = 'grpc-status';
const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
const deadlineUnitsToMs: DeadlineUnitIndexSignature = {
H: 3600000,
M: 60000,
S: 1000,
m: 1,
u: 0.001,
n: 0.000001
};
const defaultResponseHeaders = {
// TODO(cjihrig): Remove these encoding headers from the default response
// once compression is integrated.
[GRPC_ACCEPT_ENCODING_HEADER]: 'identity',
[GRPC_ENCODING_HEADER]: 'identity',
[http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto'
};
const defaultResponseOptions = {
waitForTrailers: true
} as http2.ServerStreamResponseOptions;
export class ServerReadableStream<RequestType> extends Readable {
cancelled: boolean;
export type ServerSurfaceCall = {
cancelled: boolean; getPeer(): string;
sendMetadata(responseMetadata: Metadata): void
};
constructor(
private call: ServerCall, public metadata: Metadata,
private deserialize: Deserialize<RequestType>) {
super();
this.cancelled = false;
}
export type ServerUnaryCall<RequestType, ResponseType> =
ServerSurfaceCall&{request: RequestType | null};
export type ServerReadableStream<RequestType, ResponseType> =
ServerSurfaceCall&Readable;
export type ServerWritableStream<RequestType, ResponseType> =
ServerSurfaceCall&Writable&{request: RequestType | null};
export type ServerDuplexStream<RequestType, ResponseType> =
ServerSurfaceCall&Duplex;
getPeer(): string {
throw new Error('not implemented yet');
}
sendMetadata(responseMetadata: Metadata): void {
throw new Error('not implemented yet');
}
}
export class ServerWritableStream<RequestType, ResponseType> extends Writable {
export class ServerUnaryCallImpl<RequestType, ResponseType> extends EventEmitter
implements ServerUnaryCall<RequestType, ResponseType> {
cancelled: boolean;
request: RequestType|null;
constructor(
private call: ServerCall, public metadata: Metadata,
private serialize: Serialize<ResponseType>) {
private call: Http2ServerCallStream<RequestType, ResponseType>,
public metadata: Metadata) {
super();
this.cancelled = false;
this.request = null; // TODO(cjihrig): Read the unary request here.
this.request = null;
}
getPeer(): string {
@ -79,19 +92,20 @@ export class ServerWritableStream<RequestType, ResponseType> extends Writable {
}
sendMetadata(responseMetadata: Metadata): void {
throw new Error('not implemented yet');
this.call.sendMetadata(responseMetadata);
}
}
export class ServerDuplexStream<RequestType, ResponseType> extends Duplex {
export class ServerReadableStreamImpl<RequestType, ResponseType> extends
Readable implements ServerReadableStream<RequestType, ResponseType> {
cancelled: boolean;
constructor(
private call: ServerCall, public metadata: Metadata,
private serialize: Serialize<ResponseType>,
private deserialize: Deserialize<RequestType>) {
super();
private call: Http2ServerCallStream<RequestType, ResponseType>,
public metadata: Metadata,
private _deserialize: Deserialize<RequestType>) {
super({objectMode: true});
this.cancelled = false;
}
@ -100,13 +114,103 @@ export class ServerDuplexStream<RequestType, ResponseType> extends Duplex {
}
sendMetadata(responseMetadata: Metadata): void {
throw new Error('not implemented yet');
this.call.sendMetadata(responseMetadata);
}
}
// Internal class that wraps the HTTP2 request.
export class ServerCall {}
export class ServerWritableStreamImpl<RequestType, ResponseType> extends
Writable implements ServerWritableStream<RequestType, ResponseType> {
cancelled: boolean;
request: RequestType|null;
private trailingMetadata: Metadata;
constructor(
private call: Http2ServerCallStream<RequestType, ResponseType>,
public metadata: Metadata, private _serialize: Serialize<ResponseType>) {
super({objectMode: true});
this.cancelled = false;
this.request = null;
this.trailingMetadata = new Metadata();
this.on('error', (err) => {
this.call.sendError(err as ServiceError);
this.end();
});
}
getPeer(): string {
throw new Error('not implemented yet');
}
sendMetadata(responseMetadata: Metadata): void {
this.call.sendMetadata(responseMetadata);
}
async _write(
chunk: ResponseType, encoding: string,
// tslint:disable-next-line:no-any
callback: (...args: any[]) => void) {
try {
const response = await this.call.serializeMessage(chunk);
if (!this.call.write(response)) {
this.call.once('drain', callback);
return;
}
} catch (err) {
err.code = Status.INTERNAL;
this.emit('error', err);
}
callback();
}
_final(callback: Function): void {
this.call.sendStatus(
{code: Status.OK, details: 'OK', metadata: this.trailingMetadata});
callback(null);
}
// tslint:disable-next-line:no-any
end(metadata?: any) {
if (metadata) {
this.trailingMetadata = metadata;
}
super.end();
}
serialize(input: ResponseType): Buffer|null {
if (input === null || input === undefined) {
return null;
}
return this._serialize(input);
}
}
export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
implements ServerDuplexStream<RequestType, ResponseType> {
cancelled: boolean;
constructor(
private call: Http2ServerCallStream<RequestType, ResponseType>,
public metadata: Metadata, private _serialize: Serialize<ResponseType>,
private _deserialize: Deserialize<RequestType>) {
super({objectMode: true});
this.cancelled = false;
}
getPeer(): string {
throw new Error('not implemented yet');
}
sendMetadata(responseMetadata: Metadata): void {
this.call.sendMetadata(responseMetadata);
}
}
// Unary response callback signature.
@ -116,12 +220,12 @@ export type sendUnaryData<ResponseType> =
// User provided handler for unary calls.
export type handleUnaryCall<RequestType, ResponseType> =
(call: ServerUnaryCall<RequestType>,
(call: ServerUnaryCall<RequestType, ResponseType>,
callback: sendUnaryData<ResponseType>) => void;
// User provided handler for client streaming calls.
export type handleClientStreamingCall<RequestType, ResponseType> =
(call: ServerReadableStream<RequestType>,
(call: ServerReadableStream<RequestType, ResponseType>,
callback: sendUnaryData<ResponseType>) => void;
// User provided handler for server streaming calls.
@ -138,11 +242,242 @@ export type HandleCall<RequestType, ResponseType> =
handleServerStreamingCall<RequestType, ResponseType>|
handleBidiStreamingCall<RequestType, ResponseType>;
export type Handler<RequestType, ResponseType> = {
func: HandleCall<RequestType, ResponseType>;
export type UnaryHandler<RequestType, ResponseType> = {
func: handleUnaryCall<RequestType, ResponseType>;
serialize: Serialize<ResponseType>;
deserialize: Deserialize<RequestType>;
type: HandlerType;
};
export type ClientStreamingHandler<RequestType, ResponseType> = {
func: handleClientStreamingCall<RequestType, ResponseType>;
serialize: Serialize<ResponseType>;
deserialize: Deserialize<RequestType>;
type: HandlerType;
};
export type ServerStreamingHandler<RequestType, ResponseType> = {
func: handleServerStreamingCall<RequestType, ResponseType>;
serialize: Serialize<ResponseType>;
deserialize: Deserialize<RequestType>;
type: HandlerType;
};
export type BidiStreamingHandler<RequestType, ResponseType> = {
func: handleBidiStreamingCall<RequestType, ResponseType>;
serialize: Serialize<ResponseType>;
deserialize: Deserialize<RequestType>;
type: HandlerType;
};
export type Handler<RequestType, ResponseType> =
UnaryHandler<RequestType, ResponseType>|
ClientStreamingHandler<RequestType, ResponseType>|
ServerStreamingHandler<RequestType, ResponseType>|
BidiStreamingHandler<RequestType, ResponseType>;
export type HandlerType = 'bidi'|'clientStream'|'serverStream'|'unary';
const noopTimer: NodeJS.Timer = setTimeout(() => {}, 0);
// Internal class that wraps the HTTP2 request.
export class Http2ServerCallStream<RequestType, ResponseType> extends
EventEmitter {
cancelled = false;
deadline: NodeJS.Timer = noopTimer;
private wantTrailers = false;
private metadataSent = false;
constructor(
private stream: http2.ServerHttp2Stream,
private handler: Handler<RequestType, ResponseType>) {
super();
this.stream.once('error', (err: ServiceError) => {
err.code = Status.INTERNAL;
this.sendError(err);
});
this.stream.once('close', () => {
if (this.stream.rstCode === http2.constants.NGHTTP2_CANCEL) {
this.cancelled = true;
this.emit('cancelled', 'cancelled');
}
});
this.stream.on('drain', () => {
this.emit('drain');
});
}
sendMetadata(customMetadata?: Metadata) {
if (this.metadataSent) {
return;
}
this.metadataSent = true;
const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
// TODO(cjihrig): Include compression headers.
const headers = Object.assign(defaultResponseHeaders, custom);
this.stream.respond(headers, defaultResponseOptions);
}
receiveMetadata(headers: http2.IncomingHttpHeaders) {
const metadata = Metadata.fromHttp2Headers(headers);
// TODO(cjihrig): Receive compression metadata.
const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
if (timeoutHeader.length > 0) {
const match = timeoutHeader[0].toString().match(DEADLINE_REGEX);
if (match === null) {
const err = new Error('Invalid deadline') as ServiceError;
err.code = Status.OUT_OF_RANGE;
this.sendError(err);
return;
}
const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
this.deadline = setTimeout(handleExpiredDeadline, timeout, this);
metadata.remove(GRPC_TIMEOUT_HEADER);
}
return metadata;
}
receiveUnaryMessage(): Promise<RequestType> {
return new Promise((resolve, reject) => {
const stream = this.stream;
const chunks: Buffer[] = [];
let totalLength = 0;
stream.on('data', (data: Buffer) => {
chunks.push(data);
totalLength += data.byteLength;
});
stream.once('end', async () => {
try {
const requestBytes = Buffer.concat(chunks, totalLength);
resolve(await this.deserializeMessage(requestBytes));
} catch (err) {
err.code = Status.INTERNAL;
this.sendError(err);
resolve();
}
});
});
}
serializeMessage(value: ResponseType) {
const messageBuffer = this.handler.serialize(value);
// TODO(cjihrig): Call compression aware serializeMessage().
const byteLength = messageBuffer.byteLength;
const output = Buffer.allocUnsafe(byteLength + 5);
output.writeUInt8(0, 0);
output.writeUInt32BE(byteLength, 1);
messageBuffer.copy(output, 5);
return output;
}
async deserializeMessage(bytes: Buffer) {
// TODO(cjihrig): Call compression aware deserializeMessage().
const receivedMessage = bytes.slice(5);
return this.handler.deserialize(receivedMessage);
}
async sendUnaryMessage(
err: ServiceError|null, value: ResponseType|null, metadata?: Metadata,
flags?: number) {
if (!metadata) {
metadata = new Metadata();
}
if (err) {
err.metadata = metadata;
this.sendError(err);
return;
}
try {
const response = await this.serializeMessage(value!);
this.write(response);
this.sendStatus({code: Status.OK, details: 'OK', metadata});
} catch (err) {
err.code = Status.INTERNAL;
this.sendError(err);
}
}
sendStatus(statusObj: StatusObject) {
if (this.cancelled) {
return;
}
clearTimeout(this.deadline);
if (!this.wantTrailers) {
this.wantTrailers = true;
this.stream.once('wantTrailers', () => {
const trailersToSend = Object.assign(
{
[GRPC_STATUS_HEADER]: statusObj.code,
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details as string)
},
statusObj.metadata.toHttp2Headers());
this.stream.sendTrailers(trailersToSend);
});
this.sendMetadata();
this.stream.end();
}
}
sendError(error: ServiceError) {
const status: StatusObject = {
code: Status.UNKNOWN,
details: error.hasOwnProperty('message') ? error.message :
'Unknown Error',
metadata: error.hasOwnProperty('metadata') ? error.metadata :
new Metadata()
};
if (error.hasOwnProperty('code') && Number.isInteger(error.code)) {
status.code = error.code;
if (error.hasOwnProperty('details')) {
status.details = error.details;
}
}
this.sendStatus(status);
}
write(chunk: Buffer) {
if (this.cancelled) {
return;
}
this.sendMetadata();
return this.stream.write(chunk);
}
}
// tslint:disable:no-any
type UntypedServerCall = Http2ServerCallStream<any, any>;
function handleExpiredDeadline(call: UntypedServerCall) {
const err = new Error('Deadline exceeded') as ServiceError;
err.code = Status.DEADLINE_EXCEEDED;
call.sendError(err);
call.cancelled = true;
call.emit('cancelled', 'deadline');
}

View File

@ -20,20 +20,25 @@ import {AddressInfo, ListenOptions} from 'net';
import {URL} from 'url';
import {ServiceError} from './call';
import {StatusObject} from './call-stream';
import {Status} from './constants';
import {Deserialize, Serialize, ServiceDefinition} from './make-client';
import {HandleCall, Handler, HandlerType, sendUnaryData, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServerWritableStream} from './server-call';
import {Metadata} from './metadata';
import {BidiStreamingHandler, ClientStreamingHandler, HandleCall, Handler, HandlerType, Http2ServerCallStream, PartialServiceError, sendUnaryData, ServerDuplexStream, ServerDuplexStreamImpl, ServerReadableStream, ServerReadableStreamImpl, ServerStreamingHandler, ServerUnaryCall, ServerUnaryCallImpl, ServerWritableStream, ServerWritableStreamImpl, UnaryHandler} from './server-call';
import {ServerCredentials} from './server-credentials';
function noop(): void {}
type PartialServiceError = Partial<ServiceError>;
const unimplementedStatusResponse: PartialServiceError = {
code: Status.UNIMPLEMENTED,
details: 'The server does not implement this method',
};
// tslint:disable:no-any
type UntypedUnaryHandler = UnaryHandler<any, any>;
type UntypedClientStreamingHandler = ClientStreamingHandler<any, any>;
type UntypedServerStreamingHandler = ServerStreamingHandler<any, any>;
type UntypedBidiStreamingHandler = BidiStreamingHandler<any, any>;
type UntypedHandleCall = HandleCall<any, any>;
type UntypedHandler = Handler<any, any>;
type UntypedServiceImplementation = {
@ -41,10 +46,11 @@ type UntypedServiceImplementation = {
};
const defaultHandler = {
unary(call: ServerUnaryCall<any>, callback: sendUnaryData<any>): void {
unary(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>): void {
callback(unimplementedStatusResponse as ServiceError, null);
},
clientStream(call: ServerReadableStream<any>, callback: sendUnaryData<any>):
clientStream(
call: ServerReadableStream<any, any>, callback: sendUnaryData<any>):
void {
callback(unimplementedStatusResponse as ServiceError, null);
},
@ -120,8 +126,8 @@ export class Server {
}
const success = this.register(
attrs.path, impl, attrs.responseSerialize, attrs.requestDeserialize,
methodType);
attrs.path, impl as UntypedHandleCall, attrs.responseSerialize,
attrs.requestDeserialize, methodType);
if (success === false) {
throw new Error(`Method handler for ${attrs.path} already provided.`);
@ -162,7 +168,7 @@ export class Server {
this.http2Server = http2.createServer();
}
// TODO(cjihrig): Set up the handlers, to allow requests to be processed.
this._setupHandlers();
function onError(err: Error): void {
callback(err, -1);
@ -193,8 +199,7 @@ export class Server {
}
this.handlers.set(
name,
{func: handler, serialize, deserialize, type: type as HandlerType});
name, {func: handler, serialize, deserialize, type} as UntypedHandler);
return true;
}
@ -227,4 +232,113 @@ export class Server {
addHttp2Port(): void {
throw new Error('Not yet implemented');
}
private _setupHandlers(): void {
if (this.http2Server === null) {
return;
}
this.http2Server.on(
'stream',
(stream: http2.ServerHttp2Stream,
headers: http2.IncomingHttpHeaders) => {
if (!this.started) {
stream.end();
return;
}
try {
const path = headers[http2.constants.HTTP2_HEADER_PATH] as string;
const handler = this.handlers.get(path);
if (handler === undefined) {
throw unimplementedStatusResponse;
}
const call = new Http2ServerCallStream(stream, handler);
const metadata: Metadata =
call.receiveMetadata(headers) as Metadata;
switch (handler.type) {
case 'unary':
handleUnary(call, handler as UntypedUnaryHandler, metadata);
break;
case 'clientStream':
handleClientStreaming(
call, handler as UntypedClientStreamingHandler, metadata);
break;
case 'serverStream':
handleServerStreaming(
call, handler as UntypedServerStreamingHandler, metadata);
break;
case 'bidi':
handleBidiStreaming(
call, handler as UntypedBidiStreamingHandler, metadata);
break;
default:
throw new Error(`Unknown handler type: ${handler.type}`);
}
} catch (err) {
const call = new Http2ServerCallStream(stream, null!);
err.code = Status.INTERNAL;
call.sendError(err);
}
});
}
}
async function handleUnary<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: UnaryHandler<RequestType, ResponseType>,
metadata: Metadata): Promise<void> {
const emitter =
new ServerUnaryCallImpl<RequestType, ResponseType>(call, metadata);
const request = await call.receiveUnaryMessage();
if (request === undefined || call.cancelled) {
return;
}
emitter.request = request;
handler.func(
emitter,
(err: ServiceError|null, value: ResponseType|null, trailer?: Metadata,
flags?: number) => {
call.sendUnaryMessage(err, value, trailer, flags);
});
}
function handleClientStreaming<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: ClientStreamingHandler<RequestType, ResponseType>,
metadata: Metadata): void {
throw new Error('not implemented yet');
}
async function handleServerStreaming<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: ServerStreamingHandler<RequestType, ResponseType>,
metadata: Metadata): Promise<void> {
const request = await call.receiveUnaryMessage();
if (request === undefined || call.cancelled) {
return;
}
const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
call, metadata, handler.serialize);
stream.request = request;
handler.func(stream);
}
function handleBidiStreaming<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: BidiStreamingHandler<RequestType, ResponseType>,
metadata: Metadata): void {
throw new Error('not implemented yet');
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
syntax = "proto3";
message EchoMessage {
string value = 1;
int32 value2 = 2;
}
service EchoService {
rpc Echo (EchoMessage) returns (EchoMessage);
rpc EchoClientStream (stream EchoMessage) returns (EchoMessage);
rpc EchoServerStream (EchoMessage) returns (stream EchoMessage);
rpc EchoBidiStream (stream EchoMessage) returns (stream EchoMessage);
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
syntax = "proto3";
message Request {
bool error = 1;
string message = 2;
}
message Response {
int32 count = 1;
}
service TestService {
rpc Unary (Request) returns (Response) {
}
rpc ClientStream (stream Request) returns (Response) {
}
rpc ServerStream (Request) returns (stream Response) {
}
rpc BidiStream (stream Request) returns (stream Response) {
}
}

View File

@ -0,0 +1,522 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Allow `any` data type for testing runtime type checking.
// tslint:disable no-any
import * as assert from 'assert';
import {join} from 'path';
import * as grpc from '../src';
import {ServiceError} from '../src/call';
import {ServiceClient, ServiceClientConstructor} from '../src/make-client';
import {Server} from '../src/server';
import {sendUnaryData, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServerWritableStream} from '../src/server-call';
import {loadProtoFile} from './common';
const protoFile = join(__dirname, 'fixtures', 'test_service.proto');
const testServiceDef = loadProtoFile(protoFile);
const testServiceClient =
testServiceDef.TestService as ServiceClientConstructor;
const clientInsecureCreds = grpc.credentials.createInsecure();
const serverInsecureCreds = grpc.ServerCredentials.createInsecure();
describe('Client malformed response handling', () => {
let server: Server;
let client: ServiceClient;
const badArg = Buffer.from([0xFF]);
before((done) => {
const malformedTestService = {
unary: {
path: '/TestService/Unary',
requestStream: false,
responseStream: false,
requestDeserialize: identity,
responseSerialize: identity
},
clientStream: {
path: '/TestService/ClientStream',
requestStream: true,
responseStream: false,
requestDeserialize: identity,
responseSerialize: identity
},
serverStream: {
path: '/TestService/ServerStream',
requestStream: false,
responseStream: true,
requestDeserialize: identity,
responseSerialize: identity
},
bidiStream: {
path: '/TestService/BidiStream',
requestStream: true,
responseStream: true,
requestDeserialize: identity,
responseSerialize: identity
}
} as any;
server = new Server();
server.addService(malformedTestService, {
unary(call: ServerUnaryCall<any, any>, cb: sendUnaryData<any>) {
cb(null, badArg);
},
clientStream(
stream: ServerReadableStream<any, any>, cb: sendUnaryData<any>) {
stream.on('data', noop);
stream.on('end', () => {
cb(null, badArg);
});
},
serverStream(stream: ServerWritableStream<any, any>) {
stream.write(badArg);
stream.end();
},
bidiStream(stream: ServerDuplexStream<any, any>) {
stream.on('data', () => {
// Ignore requests
stream.write(badArg);
});
stream.on('end', () => {
stream.end();
});
}
});
server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => {
assert.ifError(err);
client = new testServiceClient(`localhost:${port}`, clientInsecureCreds);
server.start();
done();
});
});
after((done) => {
client.close();
server.tryShutdown(done);
});
it('should get an INTERNAL status with a unary call', (done) => {
client.unary({}, (err: ServiceError, data: any) => {
assert(err);
assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});
});
it('should get an INTERNAL status with a server stream call', (done) => {
const call = client.serverStream({});
call.on('data', noop);
call.on('error', (err: ServiceError) => {
assert(err);
assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});
});
});
describe('Server serialization failure handling', () => {
let client: ServiceClient;
let server: Server;
before((done) => {
function serializeFail(obj: any) {
throw new Error('Serialization failed');
}
const malformedTestService = {
unary: {
path: '/TestService/Unary',
requestStream: false,
responseStream: false,
requestDeserialize: identity,
responseSerialize: serializeFail
},
clientStream: {
path: '/TestService/ClientStream',
requestStream: true,
responseStream: false,
requestDeserialize: identity,
responseSerialize: serializeFail
},
serverStream: {
path: '/TestService/ServerStream',
requestStream: false,
responseStream: true,
requestDeserialize: identity,
responseSerialize: serializeFail
},
bidiStream: {
path: '/TestService/BidiStream',
requestStream: true,
responseStream: true,
requestDeserialize: identity,
responseSerialize: serializeFail
}
};
server = new Server();
server.addService(malformedTestService as any, {
unary(call: ServerUnaryCall<any, any>, cb: sendUnaryData<any>) {
cb(null, {});
},
clientStream(
stream: ServerReadableStream<any, any>, cb: sendUnaryData<any>) {
stream.on('data', noop);
stream.on('end', () => {
cb(null, {});
});
},
serverStream(stream: ServerWritableStream<any, any>) {
stream.write({});
stream.end();
},
bidiStream(stream: ServerDuplexStream<any, any>) {
stream.on('data', () => {
// Ignore requests
stream.write({});
});
stream.on('end', () => {
stream.end();
});
}
});
server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => {
assert.ifError(err);
client = new testServiceClient(`localhost:${port}`, clientInsecureCreds);
server.start();
done();
});
});
after((done) => {
client.close();
server.tryShutdown(done);
});
it('should get an INTERNAL status with a unary call', (done) => {
client.unary({}, (err: ServiceError, data: any) => {
assert(err);
assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});
});
it('should get an INTERNAL status with a server stream call', (done) => {
const call = client.serverStream({});
call.on('data', noop);
call.on('error', (err: ServiceError) => {
assert(err);
assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});
});
});
describe('Other conditions', () => {
let client: ServiceClient;
let server: Server;
let port: number;
before((done) => {
const trailerMetadata = new grpc.Metadata();
server = new Server();
trailerMetadata.add('trailer-present', 'yes');
server.addService(testServiceClient.service, {
unary(call: ServerUnaryCall<any, any>, cb: sendUnaryData<any>) {
const req = call.request;
if (req.error) {
const details = req.message || 'Requested error';
cb({code: grpc.status.UNKNOWN, details} as ServiceError, null,
trailerMetadata);
} else {
cb(null, {count: 1}, trailerMetadata);
}
},
clientStream(
stream: ServerReadableStream<any, any>, cb: sendUnaryData<any>) {
let count = 0;
let errored = false;
stream.on('data', (data: any) => {
if (data.error) {
const message = data.message || 'Requested error';
errored = true;
cb(new Error(message) as ServiceError, null, trailerMetadata);
} else {
count++;
}
});
stream.on('end', () => {
if (!errored) {
cb(null, {count}, trailerMetadata);
}
});
},
serverStream(stream: ServerWritableStream<any, any>) {
const req = stream.request;
if (req.error) {
stream.emit('error', {
code: grpc.status.UNKNOWN,
details: req.message || 'Requested error',
metadata: trailerMetadata
});
} else {
for (let i = 0; i < 5; i++) {
stream.write({count: i});
}
stream.end(trailerMetadata);
}
},
bidiStream(stream: ServerDuplexStream<any, any>) {
let count = 0;
stream.on('data', (data: any) => {
if (data.error) {
const message = data.message || 'Requested error';
const err = new Error(message) as ServiceError;
err.metadata = trailerMetadata.clone();
err.metadata.add('count', '' + count);
stream.emit('error', err);
} else {
stream.write({count});
count++;
}
});
stream.on('end', () => {
stream.end(trailerMetadata);
});
}
});
server.bindAsync('localhost:0', serverInsecureCreds, (err, _port) => {
assert.ifError(err);
port = _port;
client = new testServiceClient(`localhost:${port}`, clientInsecureCreds);
server.start();
done();
});
});
after((done) => {
client.close();
server.tryShutdown(done);
});
describe('Server receiving bad input', () => {
let misbehavingClient: ServiceClient;
const badArg = Buffer.from([0xFF]);
before(() => {
const testServiceAttrs = {
unary: {
path: '/TestService/Unary',
requestStream: false,
responseStream: false,
requestSerialize: identity,
responseDeserialize: identity
},
clientStream: {
path: '/TestService/ClientStream',
requestStream: true,
responseStream: false,
requestSerialize: identity,
responseDeserialize: identity
},
serverStream: {
path: '/TestService/ServerStream',
requestStream: false,
responseStream: true,
requestSerialize: identity,
responseDeserialize: identity
},
bidiStream: {
path: '/TestService/BidiStream',
requestStream: true,
responseStream: true,
requestSerialize: identity,
responseDeserialize: identity
}
} as any;
const client =
grpc.makeGenericClientConstructor(testServiceAttrs, 'TestService');
misbehavingClient = new client(`localhost:${port}`, clientInsecureCreds);
});
after(() => {
misbehavingClient.close();
});
it('should respond correctly to a unary call', (done) => {
misbehavingClient.unary(badArg, (err: ServiceError, data: any) => {
assert(err);
assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});
});
it('should respond correctly to a server stream', (done) => {
const call = misbehavingClient.serverStream(badArg);
call.on('data', (data: any) => {
assert.fail(data);
});
call.on('error', (err: ServiceError) => {
assert(err);
assert.strictEqual(err.code, grpc.status.INTERNAL);
done();
});
});
});
describe('Trailing metadata', () => {
it('should be present when a unary call succeeds', (done) => {
let count = 0;
const call =
client.unary({error: false}, (err: ServiceError, data: any) => {
assert.ifError(err);
count++;
if (count === 2) {
done();
}
});
call.on('status', (status: grpc.StatusObject) => {
assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);
count++;
if (count === 2) {
done();
}
});
});
it('should be present when a unary call fails', (done) => {
let count = 0;
const call =
client.unary({error: true}, (err: ServiceError, data: any) => {
assert(err);
count++;
if (count === 2) {
done();
}
});
call.on('status', (status: grpc.StatusObject) => {
assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);
count++;
if (count === 2) {
done();
}
});
});
it('should be present when a server stream call succeeds', (done) => {
const call = client.serverStream({error: false});
call.on('data', noop);
call.on('status', (status: grpc.StatusObject) => {
assert.strictEqual(status.code, grpc.status.OK);
assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
it('should be present when a server stream call fails', (done) => {
const call = client.serverStream({error: true});
call.on('data', noop);
call.on('error', (error: ServiceError) => {
assert.deepStrictEqual(error.metadata.get('trailer-present'), ['yes']);
done();
});
});
});
describe('Error object should contain the status', () => {
it('for a unary call', (done) => {
client.unary({error: true}, (err: ServiceError, data: any) => {
assert(err);
assert.strictEqual(err.code, grpc.status.UNKNOWN);
assert.strictEqual(err.details, 'Requested error');
done();
});
});
it('for a server stream call', (done) => {
const call = client.serverStream({error: true});
call.on('data', noop);
call.on('error', (error: ServiceError) => {
assert.strictEqual(error.code, grpc.status.UNKNOWN);
assert.strictEqual(error.details, 'Requested error');
done();
});
});
it('for a UTF-8 error message', (done) => {
client.unary(
{error: true, message: '測試字符串'},
(err: ServiceError, data: any) => {
assert(err);
assert.strictEqual(err.code, grpc.status.UNKNOWN);
assert.strictEqual(err.details, '測試字符串');
done();
});
});
});
});
function identity(arg: any): any {
return arg;
}
function noop(): void {}

View File

@ -21,8 +21,12 @@ import * as assert from 'assert';
import * as fs from 'fs';
import * as path from 'path';
import * as grpc from '../src';
import {ServerCredentials} from '../src';
import {ServiceError} from '../src/call';
import {ServiceClient, ServiceClientConstructor} from '../src/make-client';
import {Server} from '../src/server';
import {sendUnaryData, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServerWritableStream} from '../src/server-call';
import {loadProtoFile} from './common';
@ -228,4 +232,158 @@ describe('Server', () => {
server.bind('localhost:0', ServerCredentials.createInsecure());
}, /Not implemented. Use bindAsync\(\) instead/);
});
describe('Default handlers', () => {
let server: Server;
let client: ServiceClient;
const mathProtoFile = path.join(__dirname, 'fixtures', 'math.proto');
const mathClient = (loadProtoFile(mathProtoFile).math as any).Math;
const mathServiceAttrs = mathClient.service;
beforeEach((done) => {
server = new Server();
server.addService(mathServiceAttrs, {});
server.bindAsync(
'localhost:0', ServerCredentials.createInsecure(), (err, port) => {
assert.ifError(err);
client = new mathClient(
`localhost:${port}`, grpc.credentials.createInsecure());
server.start();
done();
});
});
it('should respond to a unary call with UNIMPLEMENTED', (done) => {
client.div(
{divisor: 4, dividend: 3}, (error: ServiceError, response: any) => {
assert(error);
assert.strictEqual(error.code, grpc.status.UNIMPLEMENTED);
done();
});
});
it('should respond to a server stream with UNIMPLEMENTED', (done) => {
const call = client.fib({limit: 5});
call.on('data', (value: any) => {
assert.fail('No messages expected');
});
call.on('error', (err: ServiceError) => {
assert(err);
assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED);
done();
});
});
});
});
describe('Echo service', () => {
let server: Server;
let client: ServiceClient;
before((done) => {
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService =
loadProtoFile(protoFile).EchoService as ServiceClientConstructor;
server = new Server();
server.addService(echoService.service, {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
}
});
server.bindAsync(
'localhost:0', ServerCredentials.createInsecure(), (err, port) => {
assert.ifError(err);
client = new echoService(
`localhost:${port}`, grpc.credentials.createInsecure());
server.start();
done();
});
});
after((done) => {
client.close();
server.tryShutdown(done);
});
it('should echo the recieved message directly', (done) => {
client.echo(
{value: 'test value', value2: 3},
(error: ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, {value: 'test value', value2: 3});
done();
});
});
});
describe('Generic client and server', () => {
function toString(val: any) {
return val.toString();
}
function toBuffer(str: string) {
return Buffer.from(str);
}
function capitalize(str: string) {
return str.charAt(0).toUpperCase() + str.slice(1);
}
const stringServiceAttrs = {
capitalize: {
path: '/string/capitalize',
requestStream: false,
responseStream: false,
requestSerialize: toBuffer,
requestDeserialize: toString,
responseSerialize: toBuffer,
responseDeserialize: toString
}
};
describe('String client and server', () => {
let client: ServiceClient;
let server: Server;
before((done) => {
server = new Server();
server.addService(stringServiceAttrs as any, {
capitalize(
call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, capitalize(call.request));
}
});
server.bindAsync(
'localhost:0', ServerCredentials.createInsecure(), (err, port) => {
assert.ifError(err);
server.start();
const clientConstr = grpc.makeGenericClientConstructor(
stringServiceAttrs as any,
'unused_but_lets_appease_typescript_anyway');
client = new clientConstr(
`localhost:${port}`, grpc.credentials.createInsecure());
done();
});
});
after((done) => {
client.close();
server.tryShutdown(done);
});
it('Should respond with a capitalized string', (done) => {
client.capitalize('abc', (err: ServiceError, response: string) => {
assert.ifError(err);
assert.strictEqual(response, 'Abc');
done();
});
});
});
});