diff --git a/index.js b/index.js index 879bcc09..f50f0af9 100644 --- a/index.js +++ b/index.js @@ -1,111 +1,46 @@ -var assert = require('assert') +var util = require('util') +var Cursor = require('pg-cursor') var Readable = require('stream').Readable -var path = require('path') - -var pgdir = false -try { - pgdir = path.dirname(require.resolve('pg')) -} catch (e) { - pgdir = path.dirname(require.resolve('pg.js')) - if(!pgdir) { - throw new Error("Please install either `pg` or `pg.js` to use this module") - } - pgdir = path.join(pgdir, 'lib') -} -var Result = require(path.join(pgdir, 'result')) -var utils = require(path.join(pgdir, 'utils')) - var QueryStream = module.exports = function(text, values, options) { options = options || { } + Cursor.call(this, text, values) Readable.call(this, { objectMode: true, highWaterMark: options.highWaterMark || 1000 }) - this.text = text - assert(this.text, 'text cannot be falsy') - this.values = (values || []).map(utils.prepareValue) - this.name = '' - this._result = new Result() this.batchSize = options.batchSize || 100 - this._idle = true + this._ready = false + //kick reader + this.read() } -require('util').inherits(QueryStream, Readable) - -QueryStream.prototype._read = function(n) { - this._getRows(n) -} - -QueryStream.prototype._getRows = function(count) { - var con = this.connection - if(!this._idle || !this.connection) return; - this._idle = false - con.execute({ - portal: '', - rows: count - }, true) - - con.flush() -} - -QueryStream.prototype.submit = function(con) { - //save reference to connection - this.connection = con - - var name = this.name - - con.parse({ - text: this.text, - name: name, - types: [] - }, true) - - con.bind({ - portal: '', - statement: name, - values: this.values, - binary: false - }, true) - - con.describe({ - type: 'P', - name: name - }, true) - - this._getRows(this.batchSize) - -} - -QueryStream.prototype.handleRowDescription = function(msg) { - this._result.addFields(msg.fields) -} - -QueryStream.prototype.handleDataRow = function(msg) { - var row = this._result.parseRow(msg.fields) - this._more = this.push(row) -} - -QueryStream.prototype.handlePortalSuspended = function(msg) { - this._idle = true - if(this._more) { - this._getRows(this.batchSize) +util.inherits(QueryStream, Readable) +for(var key in Cursor.prototype) { + if(key != 'read') { + QueryStream.prototype[key] = Cursor.prototype[key] } } -QueryStream.prototype.handleCommandComplete = function(msg) { - this.connection.sync() -} +QueryStream.prototype._fetch = Cursor.prototype.read -QueryStream.prototype.handleReadyForQuery = function() { - this.push(null) - //ensure 'close' fires after end - setImmediate(function() { - this.emit('close') - }.bind(this)) -} - -QueryStream.prototype.handleError = function(err) { - this.connection.sync() - this.emit('error', err) +QueryStream.prototype._read = function(n) { + if(this._reading) return false; + this._reading = true + var self = this + this._fetch(this.batchSize, function(err, rows) { + if(err) { + return self.emit('error', err) + } + if(!rows.length) { + setImmediate(function() { + self.push(null) + self.once('end', self.emit.bind(self, 'close')) + }) + } + self._reading = false + for(var i = 0; i < rows.length; i++) { + self.push(rows[i]) + } + }) } diff --git a/package.json b/package.json index cddd7024..1b13fba5 100644 --- a/package.json +++ b/package.json @@ -23,14 +23,16 @@ }, "devDependencies": { "pg.js": "~2.8.0", - "gonna": "0.0.0", "lodash": "~2.2.1", "concat-stream": "~1.0.1", "through": "~2.3.4", "stream-tester": "0.0.5", "stream-spec": "~0.3.5", - "jsonstream": "0.0.1", + "jsonstream": "*", "JSONStream": "~0.7.1", "mocha": "~1.17.1" + }, + "dependencies": { + "pg-cursor": "~0.1.2" } } diff --git a/test/close.js b/test/close.js index fb812960..6fa536c8 100644 --- a/test/close.js +++ b/test/close.js @@ -7,8 +7,9 @@ var QueryStream = require('../') require('./helper')(function(client) { it('emits close', function(done) { - var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {chunkSize: 2, highWaterMark: 2}) + var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2}) var query = client.query(stream) + query.pipe(concat(function() {})) query.on('close', done) }) }) diff --git a/test/fast-reader.js b/test/fast-reader.js index 0bb395e8..808abd44 100644 --- a/test/fast-reader.js +++ b/test/fast-reader.js @@ -7,10 +7,13 @@ helper(function(client) { var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', []) var query = client.query(stream) var result = [] + var count = 0 stream.on('readable', function() { var res = stream.read() assert(res, 'should not return null on evented reader') - result.push(res.num) + if(res) { + result.push(res.num) + } }) stream.on('end', function() { var total = result.reduce(function(prev, cur) { diff --git a/test/helper.js b/test/helper.js index 6e7ea46b..cfc16ced 100644 --- a/test/helper.js +++ b/test/helper.js @@ -1,4 +1,4 @@ -var pg = require('pg') +var pg = require('pg.js') module.exports = function(cb) { describe('pg-query-stream', function() { var client = new pg.Client() diff --git a/test/pauses.js b/test/pauses.js index 66e88836..ff54bf08 100644 --- a/test/pauses.js +++ b/test/pauses.js @@ -7,7 +7,7 @@ var QueryStream = require('../') require('./helper')(function(client) { it('pauses', function(done) { - var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [200], {chunkSize: 2, highWaterMark: 2}) + var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [200], {batchSize: 2, highWaterMark: 2}) var query = client.query(stream) var pauser = tester.createPauseStream(0.1, 100) query.pipe(JSONStream.stringify()).pipe(concat(function(json) {