Merge pull request #29 from brianc/no-sync-callbacks

Cleanup
This commit is contained in:
Brian C 2017-08-05 17:28:48 -05:00 committed by GitHub
commit 71f30faeda
11 changed files with 224 additions and 161 deletions

9
.eslintrc Normal file
View File

@ -0,0 +1,9 @@
{
"extends": "standard",
"env": {
"mocha": true
},
"rules": {
"no-new-func": "off"
}
}

View File

@ -2,7 +2,9 @@ language: node_js
dist: trusty
sudo: false
node_js:
- "4.2"
- "6"
- "8"
env:
- PGUSER=postgres
services:
@ -10,4 +12,4 @@ services:
addons:
postgresql: "9.6"
before_script:
- psql -c 'create database travis;' -U postgres | true
- psql -c 'create database travis;' -U postgres | true

147
index.js
View File

@ -1,10 +1,11 @@
var Result = require('./pg').Result
var prepare = require('./pg').prepareValue
var EventEmitter = require('events').EventEmitter;
var util = require('util');
'use strict'
const Result = require('./pg').Result
const prepare = require('./pg').prepareValue
const EventEmitter = require('events').EventEmitter
const util = require('util')
function Cursor (text, values) {
EventEmitter.call(this);
EventEmitter.call(this)
this.text = text
this.values = values ? values.map(prepare) : null
@ -18,11 +19,10 @@ function Cursor (text, values) {
util.inherits(Cursor, EventEmitter)
Cursor.prototype.submit = function(connection) {
Cursor.prototype.submit = function (connection) {
this.connection = connection
var con = connection
var self = this
const con = connection
con.parse({
text: this.text
@ -34,101 +34,99 @@ Cursor.prototype.submit = function(connection) {
con.describe({
type: 'P',
name: '' //use unamed portal
name: '' // use unamed portal
}, true)
con.flush()
const ifNoData = () => {
this.state = 'idle'
this._shiftQueue()
}
con.once('noData', ifNoData)
con.once('rowDescription', function () {
con.removeListener('noData', ifNoData);
});
function ifNoData () {
self.state = 'idle'
self._shiftQueue();
}
con.removeListener('noData', ifNoData)
})
}
Cursor.prototype._shiftQueue = function () {
if(this._queue.length) {
if (this._queue.length) {
this._getRows.apply(this, this._queue.shift())
}
}
Cursor.prototype.handleRowDescription = function(msg) {
Cursor.prototype.handleRowDescription = function (msg) {
this._result.addFields(msg.fields)
this.state = 'idle'
this._shiftQueue();
this._shiftQueue()
}
Cursor.prototype.handleDataRow = function(msg) {
var row = this._result.parseRow(msg.fields)
Cursor.prototype.handleDataRow = function (msg) {
const row = this._result.parseRow(msg.fields)
this.emit('row', row, this._result)
this._rows.push(row)
}
Cursor.prototype._sendRows = function() {
Cursor.prototype._sendRows = function () {
this.state = 'idle'
setImmediate(function() {
var cb = this._cb
//remove callback before calling it
//because likely a new one will be added
//within the call to this callback
setImmediate(() => {
const cb = this._cb
// remove callback before calling it
// because likely a new one will be added
// within the call to this callback
this._cb = null
if(cb) {
if (cb) {
this._result.rows = this._rows
cb(null, this._rows, this._result)
}
this._rows = []
}.bind(this))
})
}
Cursor.prototype.handleCommandComplete = function() {
Cursor.prototype.handleCommandComplete = function () {
this.connection.sync()
}
Cursor.prototype.handlePortalSuspended = function() {
Cursor.prototype.handlePortalSuspended = function () {
this._sendRows()
}
Cursor.prototype.handleReadyForQuery = function() {
Cursor.prototype.handleReadyForQuery = function () {
this._sendRows()
this.emit('end', this._result)
this.state = 'done'
}
Cursor.prototype.handleEmptyQuery = function(con) {
if (con.sync) {
con.sync()
}
};
Cursor.prototype.handleError = function(msg) {
this.state = 'error'
this._error = msg
//satisfy any waiting callback
if(this._cb) {
this._cb(msg)
}
//dispatch error to all waiting callbacks
for(var i = 0; i < this._queue.length; i++) {
this._queue.pop()[1](msg)
}
if (this.eventNames().indexOf('error') >= 0) {
//only dispatch error events if we have a listener
this.emit('error', msg)
}
//call sync to keep this connection from hanging
Cursor.prototype.handleEmptyQuery = function () {
this.connection.sync()
}
Cursor.prototype._getRows = function(rows, cb) {
Cursor.prototype.handleError = function (msg) {
this.state = 'error'
this._error = msg
// satisfy any waiting callback
if (this._cb) {
this._cb(msg)
}
// dispatch error to all waiting callbacks
for (var i = 0; i < this._queue.length; i++) {
this._queue.pop()[1](msg)
}
if (this.listenerCount('error') > 0) {
// only dispatch error events if we have a listener
this.emit('error', msg)
}
// call sync to keep this connection from hanging
this.connection.sync()
}
Cursor.prototype._getRows = function (rows, cb) {
this.state = 'busy'
this._cb = cb
this._rows = []
var msg = {
const msg = {
portal: '',
rows: rows
}
@ -136,44 +134,43 @@ Cursor.prototype._getRows = function(rows, cb) {
this.connection.flush()
}
Cursor.prototype.end = function(cb) {
if(this.state != 'initialized') {
Cursor.prototype.end = function (cb) {
if (this.state !== 'initialized') {
this.connection.sync()
}
this.connection.end()
this.connection.stream.once('end', cb)
console.log('calling end on connection')
this.connection.end()
}
Cursor.prototype.close = function(cb) {
if (this.state == 'done') {
Cursor.prototype.close = function (cb) {
if (this.state === 'done') {
return setImmediate(cb)
}
this.connection.close({type: 'P'})
this.connection.sync()
this.state = 'done'
if(cb) {
this.connection.once('closeComplete', function() {
if (cb) {
this.connection.once('closeComplete', function () {
cb()
})
}
}
Cursor.prototype.read = function(rows, cb) {
var self = this
if(this.state == 'idle') {
Cursor.prototype.read = function (rows, cb) {
if (this.state === 'idle') {
return this._getRows(rows, cb)
}
if(this.state == 'busy' || this.state == 'initialized') {
if (this.state === 'busy' || this.state === 'initialized') {
return this._queue.push([rows, cb])
}
if(this.state == 'error') {
return cb(this._error)
if (this.state === 'error') {
return setImmediate(() => cb(this._error))
}
if(this.state == 'done') {
return cb(null, [])
}
else {
throw new Error("Unknown state: " + this.state)
if (this.state === 'done') {
return setImmediate(() => cb(null, []))
} else {
throw new Error('Unknown state: ' + this.state)
}
}

View File

@ -7,13 +7,19 @@
"test": "test"
},
"scripts": {
"test": "mocha test/"
"test": " mocha && eslint ."
},
"author": "Brian M. Carlson",
"license": "MIT",
"devDependencies": {
"pg": "~6.0.0",
"mocha": "~1.17.1"
"eslint": "^4.4.0",
"eslint-config-standard": "^10.2.1",
"eslint-plugin-import": "^2.7.0",
"eslint-plugin-node": "^5.1.1",
"eslint-plugin-promise": "^3.5.0",
"eslint-plugin-standard": "^3.0.1",
"mocha": "^3.5.0",
"pg": "~6.0.0"
},
"dependencies": {}
}

8
pg.js
View File

@ -1,10 +1,10 @@
//support both pg & pg.js
//this will eventually go away when i break native bindings
//out into their own module
// support both pg & pg.js
// this will eventually go away when i break native bindings
// out into their own module
try {
module.exports.Result = require('pg/lib/result.js')
module.exports.prepareValue = require('pg/lib/utils.js').prepareValue
} catch(e) {
} catch (e) {
module.exports.Result = require('pg.js/lib/result.js')
module.exports.prepareValue = require('pg.js/lib/utils.js').prepareValue
}

View File

@ -3,30 +3,30 @@ var Cursor = require('../')
var pg = require('pg')
var text = 'SELECT generate_series as num FROM generate_series(0, 50)'
describe('close', function() {
beforeEach(function(done) {
describe('close', function () {
beforeEach(function (done) {
var client = this.client = new pg.Client()
client.connect(done)
client.on('drain', client.end.bind(client))
})
it('closes cursor early', function(done) {
it('closes cursor early', function (done) {
var cursor = new Cursor(text)
this.client.query(cursor)
this.client.query('SELECT NOW()', done)
cursor.read(25, function(err, res) {
cursor.read(25, function (err, res) {
assert.ifError(err)
cursor.close()
})
})
it('works with callback style', function(done) {
it('works with callback style', function (done) {
var cursor = new Cursor(text)
var client = this.client
client.query(cursor)
cursor.read(25, function(err, res) {
cursor.read(25, function (err, res) {
assert.ifError(err)
cursor.close(function(err) {
cursor.close(function (err) {
assert.ifError(err)
client.query('SELECT NOW()', done)
})

View File

@ -1,17 +1,18 @@
'use strict'
var assert = require('assert')
var Cursor = require('../')
var pg = require('pg')
var text = 'SELECT generate_series as num FROM generate_series(0, 4)'
describe('error handling', function() {
it('can continue after error', function(done) {
describe('error handling', function () {
it('can continue after error', function (done) {
var client = new pg.Client()
client.connect()
var cursor = client.query(new Cursor('asdfdffsdf'))
cursor.read(1, function(err) {
cursor.read(1, function (err) {
assert(err)
client.query('SELECT NOW()', function(err, res) {
client.query('SELECT NOW()', function (err, res) {
assert.ifError(err)
client.end()
done()
@ -20,16 +21,61 @@ describe('error handling', function() {
})
})
describe('proper cleanup', function() {
it('can issue multiple cursors on one client', function(done) {
describe('read callback does not fire sync', () => {
it('does not fire error callback sync', (done) => {
var client = new pg.Client()
client.connect()
var cursor = client.query(new Cursor('asdfdffsdf'))
let after = false
cursor.read(1, function (err) {
assert(err, 'error should be returned')
assert.equal(after, true, 'should not call read sync')
after = false
cursor.read(1, function (err) {
assert(err, 'error should be returned')
assert.equal(after, true, 'should not call read sync')
client.end()
done()
})
after = true
})
after = true
})
it('does not fire result sync after finished', (done) => {
var client = new pg.Client()
client.connect()
var cursor = client.query(new Cursor('SELECT NOW()'))
let after = false
cursor.read(1, function (err) {
assert(!err)
assert.equal(after, true, 'should not call read sync')
cursor.read(1, function (err) {
assert(!err)
after = false
cursor.read(1, function (err) {
assert(!err)
assert.equal(after, true, 'should not call read sync')
client.end()
done()
})
after = true
})
})
after = true
})
})
describe('proper cleanup', function () {
it('can issue multiple cursors on one client', function (done) {
var client = new pg.Client()
client.connect()
var cursor1 = client.query(new Cursor(text))
cursor1.read(8, function(err, rows) {
cursor1.read(8, function (err, rows) {
assert.ifError(err)
assert.equal(rows.length, 5)
cursor2 = client.query(new Cursor(text))
cursor2.read(8, function(err, rows) {
var cursor2 = client.query(new Cursor(text))
cursor2.read(8, function (err, rows) {
assert.ifError(err)
assert.equal(rows.length, 5)
client.end()

View File

@ -4,60 +4,60 @@ var pg = require('pg')
var text = 'SELECT generate_series as num FROM generate_series(0, 5)'
describe('cursor', function() {
beforeEach(function(done) {
describe('cursor', function () {
beforeEach(function (done) {
var client = this.client = new pg.Client()
client.connect(done)
this.pgCursor = function(text, values) {
this.pgCursor = function (text, values) {
client.on('drain', client.end.bind(client))
return client.query(new Cursor(text, values || []))
}
})
afterEach(function() {
afterEach(function () {
this.client.end()
})
it('fetch 6 when asking for 10', function(done) {
it('fetch 6 when asking for 10', function (done) {
var cursor = this.pgCursor(text)
cursor.read(10, function(err, res) {
cursor.read(10, function (err, res) {
assert.ifError(err)
assert.equal(res.length, 6)
done()
})
})
it('end before reading to end', function(done) {
it('end before reading to end', function (done) {
var cursor = this.pgCursor(text)
cursor.read(3, function(err, res) {
cursor.read(3, function (err, res) {
assert.ifError(err)
assert.equal(res.length, 3)
cursor.end(done)
})
})
it('callback with error', function(done) {
it('callback with error', function (done) {
var cursor = this.pgCursor('select asdfasdf')
cursor.read(1, function(err) {
cursor.read(1, function (err) {
assert(err)
done()
})
})
it('read a partial chunk of data', function(done) {
it('read a partial chunk of data', function (done) {
var cursor = this.pgCursor(text)
cursor.read(2, function(err, res) {
cursor.read(2, function (err, res) {
assert.ifError(err)
assert.equal(res.length, 2)
cursor.read(3, function(err, res) {
cursor.read(3, function (err, res) {
assert(!err)
assert.equal(res.length, 3)
cursor.read(1, function(err, res) {
cursor.read(1, function (err, res) {
assert(!err)
assert.equal(res.length, 1)
cursor.read(1, function(err, res) {
cursor.read(1, function (err, res) {
assert(!err)
assert.ifError(err)
assert.strictEqual(res.length, 0)
done()
@ -67,12 +67,15 @@ describe('cursor', function() {
})
})
it('read return length 0 past the end', function(done) {
it('read return length 0 past the end', function (done) {
var cursor = this.pgCursor(text)
cursor.read(2, function(err, res) {
cursor.read(100, function(err, res) {
cursor.read(2, function (err, res) {
assert(!err)
cursor.read(100, function (err, res) {
assert(!err)
assert.equal(res.length, 4)
cursor.read(100, function(err, res) {
cursor.read(100, function (err, res) {
assert(!err)
assert.equal(res.length, 0)
done()
})
@ -80,22 +83,22 @@ describe('cursor', function() {
})
})
it('read huge result', function(done) {
it('read huge result', function (done) {
this.timeout(10000)
var text = 'SELECT generate_series as num FROM generate_series(0, 100000)'
var values = []
var cursor = this.pgCursor(text, values);
var count = 0;
var read = function() {
cursor.read(100, function(err, rows) {
if(err) return done(err);
if(!rows.length) {
var cursor = this.pgCursor(text, values)
var count = 0
var read = function () {
cursor.read(100, function (err, rows) {
if (err) return done(err)
if (!rows.length) {
assert.equal(count, 100001)
return done()
}
count += rows.length;
if(count%10000 == 0) {
//console.log(count)
count += rows.length
if (count % 10000 === 0) {
// console.log(count)
}
setImmediate(read)
})
@ -103,23 +106,24 @@ describe('cursor', function() {
read()
})
it('normalizes parameter values', function(done) {
it('normalizes parameter values', function (done) {
var text = 'SELECT $1::json me'
var values = [{name: 'brian'}]
var cursor = this.pgCursor(text, values);
cursor.read(1, function(err, rows) {
if(err) return done(err);
var values = [{ name: 'brian' }]
var cursor = this.pgCursor(text, values)
cursor.read(1, function (err, rows) {
if (err) return done(err)
assert.equal(rows[0].me.name, 'brian')
cursor.read(1, function(err, rows) {
cursor.read(1, function (err, rows) {
assert(!err)
assert.equal(rows.length, 0)
done()
})
})
})
it('returns result along with rows', function(done) {
it('returns result along with rows', function (done) {
var cursor = this.pgCursor(text)
cursor.read(1, function(err, rows, result) {
cursor.read(1, function (err, rows, result) {
assert.ifError(err)
assert.equal(rows.length, 1)
assert.strictEqual(rows, result.rows)
@ -128,7 +132,7 @@ describe('cursor', function() {
})
})
it('emits row events', function(done) {
it('emits row events', function (done) {
var cursor = this.pgCursor(text)
cursor.read(10)
cursor.on('row', (row, result) => result.addRow(row))
@ -138,7 +142,7 @@ describe('cursor', function() {
})
})
it('emits row events when cursor is closed manually', function(done) {
it('emits row events when cursor is closed manually', function (done) {
var cursor = this.pgCursor(text)
cursor.on('row', (row, result) => result.addRow(row))
cursor.on('end', (result) => {
@ -149,9 +153,9 @@ describe('cursor', function() {
cursor.read(3, () => cursor.close())
})
it('emits error events', function(done) {
it('emits error events', function (done) {
var cursor = this.pgCursor('select asdfasdf')
cursor.on('error', function(err) {
cursor.on('error', function (err) {
assert(err)
done()
})

View File

@ -1,3 +1,3 @@
--reporter spec
--no-exit
--bail
--reporter=spec

View File

@ -1,15 +1,14 @@
var assert = require('assert')
var pg = require('pg');
var Cursor = require('../');
var pg = require('pg')
var Cursor = require('../')
describe('queries with no data', function () {
beforeEach(function(done) {
beforeEach(function (done) {
var client = this.client = new pg.Client()
client.connect(done)
})
afterEach(function() {
afterEach(function () {
this.client.end()
})
@ -21,7 +20,7 @@ describe('queries with no data', function () {
assert.equal(rows.length, 0)
done()
})
});
})
it('handles empty query', function (done) {
var cursor = new Cursor('-- this is a comment')
@ -32,5 +31,4 @@ describe('queries with no data', function () {
done()
})
})
});
})

View File

@ -1,10 +1,11 @@
'use strict'
const assert = require('assert')
const Cursor = require('../')
const pg = require('pg')
const text = 'SELECT generate_series as num FROM generate_series(0, 50)'
function poolQueryPromise(pool, readRowCount) {
function poolQueryPromise (pool, readRowCount) {
return new Promise((resolve, reject) => {
pool.connect((err, client, done) => {
if (err) {
@ -30,16 +31,16 @@ function poolQueryPromise(pool, readRowCount) {
})
}
describe('pool', function() {
beforeEach(function() {
describe('pool', function () {
beforeEach(function () {
this.pool = new pg.Pool({max: 1})
})
afterEach(function() {
afterEach(function () {
this.pool.end()
})
it('closes cursor early, single pool query', function(done) {
it('closes cursor early, single pool query', function (done) {
poolQueryPromise(this.pool, 25)
.then(() => done())
.catch(err => {
@ -48,7 +49,7 @@ describe('pool', function() {
})
})
it('closes cursor early, saturated pool', function(done) {
it('closes cursor early, saturated pool', function (done) {
const promises = []
for (let i = 0; i < 10; i++) {
promises.push(poolQueryPromise(this.pool, 25))
@ -61,7 +62,7 @@ describe('pool', function() {
})
})
it('closes exhausted cursor, single pool query', function(done) {
it('closes exhausted cursor, single pool query', function (done) {
poolQueryPromise(this.pool, 100)
.then(() => done())
.catch(err => {
@ -70,7 +71,7 @@ describe('pool', function() {
})
})
it('closes exhausted cursor, saturated pool', function(done) {
it('closes exhausted cursor, saturated pool', function (done) {
const promises = []
for (let i = 0; i < 10; i++) {
promises.push(poolQueryPromise(this.pool, 100))
@ -82,4 +83,4 @@ describe('pool', function() {
done()
})
})
})
})