diff --git a/index.js b/index.js index 8960a352..37c59ed3 100644 --- a/index.js +++ b/index.js @@ -1,116 +1,125 @@ -var path = require('path') - -var resultPath = path.dirname(require.resolve('pg.js')) + '/lib/result' -var Result = require(resultPath) -var Client = require('pg.js').Client +var Result = require('./result') var Cursor = function(text, values) { this.text = text this.values = values - this._connection = null + this.connection = null + this._queue = [] + this.state = 'initialized' + this._result = new Result() + this._cb = null + this._rows = null } -Cursor.prototype._connect = function(cb) { - if(this._connected) return setImmediate(cb); - this._connected = true +Cursor.prototype.submit = function(connection) { + this.connection = connection + + var con = connection var self = this - var client = new Client() - client.connect(function(err) { - if(err) return cb(err); - //remove all listeners from - //client's connection and discard the client - self.connection = client.connection - self.connection.removeAllListeners() + con.parse({ + text: this.text + }, true) - var con = self.connection + con.bind({ + values: this.values + }, true) - con.parse({ - text: self.text - }, true) + con.describe({ + type: 'P', + name: '' //use unamed portal + }, true) - con.bind({ - values: self.values - }, true) - - con.describe({ - type: 'P', - name: '' //use unamed portal - }, true) - - con.flush() - - var onError = function(err) { - cb(err) - con.end() - } - - con.once('error', onError) - - con.on('rowDescription', function(msg) { - self.rowDescription = msg - con.removeListener('error', onError) - cb(null, con) - }) - - var onRow = function(msg) { - var row = self.result.parseRow(msg.fields) - self.result.addRow(row) - } - - con.on('dataRow', onRow) - - con.once('readyForQuery', function() { - con.end() - }) - - con.once('commandComplete', function() { - self._complete = true - con.sync() - }) - }) + con.flush() } -Cursor.prototype._getRows = function(con, n, cb) { - if(this._done) { - return cb(null, [], false) +Cursor.prototype.handleRowDescription = function(msg) { + this._result.addFields(msg.fields) + this.state = 'idle' + if(this._queue.length) { + this._getRows.apply(this, this._queue.shift()) } +} + +Cursor.prototype.handleDataRow = function(msg) { + var row = this._result.parseRow(msg.fields) + this._rows.push(row) +} + +Cursor.prototype._sendRows = function() { + this.state = 'idle' + setImmediate(function() { + this._cb(null, this._rows) + this._rows = [] + }.bind(this)) +} + +Cursor.prototype.handleCommandComplete = function() { + this._sendRows() + this.state = 'done' + this.connection.sync() +} + +Cursor.prototype.handlePortalSuspended = function() { + this._sendRows() +} + +Cursor.prototype.handleReadyForQuery = function() { + +} + +Cursor.prototype.handleError = function(msg) { + this.state = 'error' + this._error = msg + //satisfy any waiting callback + if(this._cb) { + this._cb(msg) + } + //dispatch error to all waiting callbacks + for(var i = 0; i < this._queue.length; i++) { + this._queue.pop()[1](msg) + } +} + +Cursor.prototype._getRows = function(rows, cb) { + console.log('get', rows) + this.state = 'busy' + this._cb = cb + this._rows = [] var msg = { portal: '', - rows: n + rows: rows } - con.execute(msg, true) - con.flush() - this.result = new Result() - this.result.addFields(this.rowDescription.fields) - - var self = this - - var onComplete = function() { - self._done = true - cb(null, self.result.rows, self.result) - } - con.once('commandComplete', onComplete) - - con.once('portalSuspended', function() { - cb(null, self.result.rows, self.result) - con.removeListener('commandComplete', onComplete) - }) + this.connection.execute(msg, true) + this.connection.flush() } Cursor.prototype.end = function(cb) { + if(this.statue != 'initialized') { + this.connection.sync() + } this.connection.end() this.connection.stream.once('end', cb) } Cursor.prototype.read = function(rows, cb) { + console.log('read', rows, this.state) var self = this - this._connect(function(err) { - if(err) return cb(err); - self._getRows(self.connection, rows, cb) - }) + if(this.state == 'idle') { + return this._getRows(rows, cb) + } + if(this.state == 'busy' || this.state == 'initialized') { + return this._queue.push([rows, cb]) + } + if(this.state == 'error') { + return cb(this._error) + } + if(this.state == 'done') { + return cb(null, []) + } + else { + throw new Error("Unknown state: " + this.state) + } } -module.exports = function(query, params) { - return new Cursor(query, params) -} +module.exports = Cursor diff --git a/result.js b/result.js new file mode 100644 index 00000000..3e11636b --- /dev/null +++ b/result.js @@ -0,0 +1,12 @@ +var path = require('path') +var pgPath; +//support both pg & pg.js +//this will eventually go away when i break native bindings +//out into their own module +try { + pgPath = path.dirname(require.resolve('pg')) +} catch(e) { + pgPath = path.dirname(require.resolve('pg.js')) +} + +module.exports = require(path.join(pgPath, 'lib', 'result.js')) diff --git a/test/index.js b/test/index.js index a9d1efe4..89fbf6eb 100644 --- a/test/index.js +++ b/test/index.js @@ -1,86 +1,107 @@ var assert = require('assert') -var pgCursor = require('../') -var gonna = require('gonna') +var Cursor = require('../') +var pg = require('pg.js') var text = 'SELECT generate_series as num FROM generate_series(0, 5)' -var values = [] -it('fetch 6 when asking for 10', function(done) { - var cursor = pgCursor(text) - cursor.read(10, function(err, res) { - assert.ifError(err) - assert.equal(res.length, 6) - done() +describe('cursor', function() { + + var client; + + var pgCursor = function(text, values) { + client.connect() + client.on('drain', client.end.bind(client)) + return client.query(new Cursor(text, values || [])) + } + + before(function() { + client = new pg.Client() }) -}) -it('end before reading to end', function(done) { - var cursor = pgCursor(text) - cursor.read(3, function(err, res) { - assert.equal(res.length, 3) - cursor.end(done) + + after(function() { + client.end() }) -}) -it('callback with error', function(done) { - var cursor = pgCursor('select asdfasdf') - cursor.read(1, function(err) { - assert(err) - done() + it('fetch 6 when asking for 10', function(done) { + var cursor = pgCursor(text) + cursor.read(10, function(err, res) { + assert.ifError(err) + assert.equal(res.length, 6) + done() + }) }) -}) - -it('read a partial chunk of data', function(done) { - var cursor = pgCursor(text) - cursor.read(2, function(err, res) { - assert.equal(res.length, 2) + it('end before reading to end', function(done) { + var cursor = pgCursor(text) cursor.read(3, function(err, res) { + assert.ifError(err) assert.equal(res.length, 3) - cursor.read(1, function(err, res) { - assert.equal(res.length, 1) + cursor.end(done) + }) + }) + + it('callback with error', function(done) { + var cursor = pgCursor('select asdfasdf') + cursor.read(1, function(err) { + assert(err) + done() + }) + }) + + + it('read a partial chunk of data', function(done) { + var cursor = pgCursor(text) + cursor.read(2, function(err, res) { + assert.ifError(err) + assert.equal(res.length, 2) + cursor.read(3, function(err, res) { + assert.equal(res.length, 3) cursor.read(1, function(err, res) { - assert.ifError(err) - assert.strictEqual(res.length, 0) + assert.equal(res.length, 1) + cursor.read(1, function(err, res) { + assert.ifError(err) + assert.strictEqual(res.length, 0) + done() + }) + }) + }) + }) + }) + + it('read return length 0 past the end', function(done) { + var cursor = pgCursor(text) + cursor.read(2, function(err, res) { + cursor.read(100, function(err, res) { + assert.equal(res.length, 4) + cursor.read(100, function(err, res) { + assert.equal(res.length, 0) done() }) }) }) }) -}) -it('read return length 0 past the end', function(done) { - var cursor = pgCursor(text) - cursor.read(2, function(err, res) { - cursor.read(100, function(err, res) { - assert.equal(res.length, 4) - cursor.read(100, function(err, res) { - assert.equal(res.length, 0) - done() + it('read huge result', function(done) { + this.timeout(10000) + var text = 'SELECT generate_series as num FROM generate_series(0, 1000000)' + var values = [] + cursor = pgCursor(text, values); + var count = 0; + var read = function() { + cursor.read(1000, function(err, rows) { + if(err) return done(err); + if(!rows.length) { + assert.equal(count, 1000001) + return done() + } + count += rows.length; + if(count%100000 == 0) { + //console.log(count) + } + setImmediate(read) }) - }) + } + read() }) }) - -it('read huge result', function(done) { - this.timeout(10000) - var text = 'SELECT generate_series as num FROM generate_series(0, 1000000)' - var values = [] - cursor = pgCursor(text, values); - var count = 0; - var read = function() { - cursor.read(1000, function(err, rows) { - if(err) return done(err); - if(!rows.length) { - assert.equal(count, 1000001) - return done() - } - count += rows.length; - if(count%100000 == 0) { - //console.log(count) - } - setImmediate(read) - }) - } - read() -})