From 02bcc9d97af454b56106fd080146e5c8e1828586 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Tue, 13 Jun 2017 12:54:07 -0500 Subject: [PATCH] Start working on promsie tests --- lib/client.js | 133 +++++++++++-------- test/integration/client/promise-api-tests.js | 105 +++++++++++++++ 2 files changed, 182 insertions(+), 56 deletions(-) create mode 100644 test/integration/client/promise-api-tests.js diff --git a/lib/client.js b/lib/client.js index 32ea76b9..3a8515a4 100644 --- a/lib/client.js +++ b/lib/client.js @@ -108,56 +108,21 @@ Client.prototype.connect = function(callback) { self.secretKey = msg.secretKey; }); + + con.on('readyForQuery', function() { + var activeQuery = self.activeQuery; + self.activeQuery = null; + self.readyForQuery = true; + self._pulseQueryQueue(); + if(activeQuery) { + activeQuery.handleReadyForQuery(con); + } + }); + //hook up query handling events to connection //after the connection initially becomes ready for queries con.once('readyForQuery', function() { - self._connecting = false; - - //delegate rowDescription to active query - con.on('rowDescription', function(msg) { - self.activeQuery.handleRowDescription(msg); - }); - - //delegate dataRow to active query - con.on('dataRow', function(msg) { - self.activeQuery.handleDataRow(msg); - }); - - //delegate portalSuspended to active query - con.on('portalSuspended', function(msg) { - self.activeQuery.handlePortalSuspended(con); - }); - - //deletagate emptyQuery to active query - con.on('emptyQuery', function(msg) { - self.activeQuery.handleEmptyQuery(con); - }); - - //delegate commandComplete to active query - con.on('commandComplete', function(msg) { - self.activeQuery.handleCommandComplete(msg, con); - }); - - //if a prepared statement has a name and properly parses - //we track that its already been executed so we don't parse - //it again on the same client - con.on('parseComplete', function(msg) { - if(self.activeQuery.name) { - con.parsedStatements[self.activeQuery.name] = true; - } - }); - - con.on('copyInResponse', function(msg) { - self.activeQuery.handleCopyInResponse(self.connection); - }); - - con.on('copyData', function (msg) { - self.activeQuery.handleCopyData(msg, self.connection); - }); - - con.on('notification', function(msg) { - self.emit('notification', msg); - }); + self._attachEventListeners(con) //process possible callback argument to Client#connect if (callback) { @@ -169,15 +134,15 @@ Client.prototype.connect = function(callback) { self.emit('connect'); }); - con.on('readyForQuery', function() { - var activeQuery = self.activeQuery; - self.activeQuery = null; - self.readyForQuery = true; - self._pulseQueryQueue(); - if(activeQuery) { - activeQuery.handleReadyForQuery(con); - } - }); + if (!callback) { + return new global.Promise(function (resolve, reject) { + con.once('connect', () => { + con.removeListener('error', reject) + resolve() + }) + con.once('error', reject) + }) + } con.on('error', function(error) { if(this.activeQuery) { @@ -234,6 +199,58 @@ Client.prototype.connect = function(callback) { }; +// once a connection is established connect listeners +Client.prototype._attachEventListeners = function(con) { + var self = this; + self._connecting = false; + + //delegate rowDescription to active query + con.on('rowDescription', function(msg) { + self.activeQuery.handleRowDescription(msg); + }); + + //delegate dataRow to active query + con.on('dataRow', function(msg) { + self.activeQuery.handleDataRow(msg); + }); + + //delegate portalSuspended to active query + con.on('portalSuspended', function(msg) { + self.activeQuery.handlePortalSuspended(con); + }); + + //deletagate emptyQuery to active query + con.on('emptyQuery', function(msg) { + self.activeQuery.handleEmptyQuery(con); + }); + + //delegate commandComplete to active query + con.on('commandComplete', function(msg) { + self.activeQuery.handleCommandComplete(msg, con); + }); + + //if a prepared statement has a name and properly parses + //we track that its already been executed so we don't parse + //it again on the same client + con.on('parseComplete', function(msg) { + if(self.activeQuery.name) { + con.parsedStatements[self.activeQuery.name] = true; + } + }); + + con.on('copyInResponse', function(msg) { + self.activeQuery.handleCopyInResponse(self.connection); + }); + + con.on('copyData', function (msg) { + self.activeQuery.handleCopyData(msg, self.connection); + }); + + con.on('notification', function(msg) { + self.emit('notification', msg); + }); +} + Client.prototype.getStartupConf = function() { var params = this.connectionParameters; @@ -391,6 +408,10 @@ Client.prototype.end = function(cb) { this.connection.end(); if (cb) { this.connection.once('end', cb); + } else { + return new global.Promise((resolve) => { + this.connection.once('end', resolve); + }); } }; diff --git a/test/integration/client/promise-api-tests.js b/test/integration/client/promise-api-tests.js new file mode 100644 index 00000000..783fb46c --- /dev/null +++ b/test/integration/client/promise-api-tests.js @@ -0,0 +1,105 @@ +const async = require('async') +const helper = require('./test-helper') +const pg = helper.pg; + +class Test { + constructor(name, cb) { + this.name = name + this.action = cb + this.timeout = 5000 + } + + run(cb) { + try { + this._run(cb) + } catch (e) { + cb(e) + } + } + + _run(cb) { + if (!this.action) { + console.log(`${this.name} skipped`) + return cb() + } + if (!this.action.length) { + const result = this.action.call(this) + if ((result || 0).then) { + result + .then(() => cb()) + .catch(err => cb(err || new Error('Unhandled promise rejection'))) + } + } else { + this.action.call(this, cb) + } + } +} + +class Suite { + constructor() { + console.log('') + this._queue = async.queue(this.run.bind(this), 1) + this._queue.drain = () => { } + } + + run(test, cb) { + const tid = setTimeout(() => { + const err = Error(`test: ${test.name} did not complete withint ${test.timeout}ms`) + cb(err) + }, test.timeout) + test.run((err) => { + clearTimeout(tid) + if (err) { + console.log(test.name + ' FAILED!', err.stack) + } else { + console.log(test.name) + } + cb(err) + }) + } + + test(name, cb) { + this._queue.push(new Test(name, cb)) + } +} + +const suite = new Suite() + +suite.test('valid connection completes promise', () => { + const client = new pg.Client() + return client.connect() + .then(() => { + return client.end() + .then(() => { }) + }) +}) + +suite.test('valid connection completes promise', () => { + const client = new pg.Client() + return client.connect() + .then(() => { + return client.end() + .then(() => { }) + }) +}) + + +suite.test('invalid connection rejects promise', (done) => { + const client = new pg.Client({ host: 'alksdjflaskdfj' }) + return client.connect() + .catch(e => { + assert(e instanceof Error) + done() + }) +}) + +suite.test('connected client does not reject promise after', (done) => { + const client = new pg.Client() + return client.connect() + .then(() => { + setTimeout(() => { + // manually kill the connection + client.connection.stream.end() + }, 50) + }) +})