Start working on promsie tests

This commit is contained in:
Brian M. Carlson 2017-06-13 12:54:07 -05:00 committed by Brian C
parent bc2f550402
commit 02bcc9d97a
2 changed files with 182 additions and 56 deletions

View File

@ -108,56 +108,21 @@ Client.prototype.connect = function(callback) {
self.secretKey = msg.secretKey;
});
con.on('readyForQuery', function() {
var activeQuery = self.activeQuery;
self.activeQuery = null;
self.readyForQuery = true;
self._pulseQueryQueue();
if(activeQuery) {
activeQuery.handleReadyForQuery(con);
}
});
//hook up query handling events to connection
//after the connection initially becomes ready for queries
con.once('readyForQuery', function() {
self._connecting = false;
//delegate rowDescription to active query
con.on('rowDescription', function(msg) {
self.activeQuery.handleRowDescription(msg);
});
//delegate dataRow to active query
con.on('dataRow', function(msg) {
self.activeQuery.handleDataRow(msg);
});
//delegate portalSuspended to active query
con.on('portalSuspended', function(msg) {
self.activeQuery.handlePortalSuspended(con);
});
//deletagate emptyQuery to active query
con.on('emptyQuery', function(msg) {
self.activeQuery.handleEmptyQuery(con);
});
//delegate commandComplete to active query
con.on('commandComplete', function(msg) {
self.activeQuery.handleCommandComplete(msg, con);
});
//if a prepared statement has a name and properly parses
//we track that its already been executed so we don't parse
//it again on the same client
con.on('parseComplete', function(msg) {
if(self.activeQuery.name) {
con.parsedStatements[self.activeQuery.name] = true;
}
});
con.on('copyInResponse', function(msg) {
self.activeQuery.handleCopyInResponse(self.connection);
});
con.on('copyData', function (msg) {
self.activeQuery.handleCopyData(msg, self.connection);
});
con.on('notification', function(msg) {
self.emit('notification', msg);
});
self._attachEventListeners(con)
//process possible callback argument to Client#connect
if (callback) {
@ -169,15 +134,15 @@ Client.prototype.connect = function(callback) {
self.emit('connect');
});
con.on('readyForQuery', function() {
var activeQuery = self.activeQuery;
self.activeQuery = null;
self.readyForQuery = true;
self._pulseQueryQueue();
if(activeQuery) {
activeQuery.handleReadyForQuery(con);
}
});
if (!callback) {
return new global.Promise(function (resolve, reject) {
con.once('connect', () => {
con.removeListener('error', reject)
resolve()
})
con.once('error', reject)
})
}
con.on('error', function(error) {
if(this.activeQuery) {
@ -234,6 +199,58 @@ Client.prototype.connect = function(callback) {
};
// once a connection is established connect listeners
Client.prototype._attachEventListeners = function(con) {
var self = this;
self._connecting = false;
//delegate rowDescription to active query
con.on('rowDescription', function(msg) {
self.activeQuery.handleRowDescription(msg);
});
//delegate dataRow to active query
con.on('dataRow', function(msg) {
self.activeQuery.handleDataRow(msg);
});
//delegate portalSuspended to active query
con.on('portalSuspended', function(msg) {
self.activeQuery.handlePortalSuspended(con);
});
//deletagate emptyQuery to active query
con.on('emptyQuery', function(msg) {
self.activeQuery.handleEmptyQuery(con);
});
//delegate commandComplete to active query
con.on('commandComplete', function(msg) {
self.activeQuery.handleCommandComplete(msg, con);
});
//if a prepared statement has a name and properly parses
//we track that its already been executed so we don't parse
//it again on the same client
con.on('parseComplete', function(msg) {
if(self.activeQuery.name) {
con.parsedStatements[self.activeQuery.name] = true;
}
});
con.on('copyInResponse', function(msg) {
self.activeQuery.handleCopyInResponse(self.connection);
});
con.on('copyData', function (msg) {
self.activeQuery.handleCopyData(msg, self.connection);
});
con.on('notification', function(msg) {
self.emit('notification', msg);
});
}
Client.prototype.getStartupConf = function() {
var params = this.connectionParameters;
@ -391,6 +408,10 @@ Client.prototype.end = function(cb) {
this.connection.end();
if (cb) {
this.connection.once('end', cb);
} else {
return new global.Promise((resolve) => {
this.connection.once('end', resolve);
});
}
};

View File

@ -0,0 +1,105 @@
const async = require('async')
const helper = require('./test-helper')
const pg = helper.pg;
class Test {
constructor(name, cb) {
this.name = name
this.action = cb
this.timeout = 5000
}
run(cb) {
try {
this._run(cb)
} catch (e) {
cb(e)
}
}
_run(cb) {
if (!this.action) {
console.log(`${this.name} skipped`)
return cb()
}
if (!this.action.length) {
const result = this.action.call(this)
if ((result || 0).then) {
result
.then(() => cb())
.catch(err => cb(err || new Error('Unhandled promise rejection')))
}
} else {
this.action.call(this, cb)
}
}
}
class Suite {
constructor() {
console.log('')
this._queue = async.queue(this.run.bind(this), 1)
this._queue.drain = () => { }
}
run(test, cb) {
const tid = setTimeout(() => {
const err = Error(`test: ${test.name} did not complete withint ${test.timeout}ms`)
cb(err)
}, test.timeout)
test.run((err) => {
clearTimeout(tid)
if (err) {
console.log(test.name + ' FAILED!', err.stack)
} else {
console.log(test.name)
}
cb(err)
})
}
test(name, cb) {
this._queue.push(new Test(name, cb))
}
}
const suite = new Suite()
suite.test('valid connection completes promise', () => {
const client = new pg.Client()
return client.connect()
.then(() => {
return client.end()
.then(() => { })
})
})
suite.test('valid connection completes promise', () => {
const client = new pg.Client()
return client.connect()
.then(() => {
return client.end()
.then(() => { })
})
})
suite.test('invalid connection rejects promise', (done) => {
const client = new pg.Client({ host: 'alksdjflaskdfj' })
return client.connect()
.catch(e => {
assert(e instanceof Error)
done()
})
})
suite.test('connected client does not reject promise after', (done) => {
const client = new pg.Client()
return client.connect()
.then(() => {
setTimeout(() => {
// manually kill the connection
client.connection.stream.end()
}, 50)
})
})