Rebase code on top of pg-cursor

This commit is contained in:
Brian M. Carlson 2014-02-26 09:38:16 -06:00
parent 122bcfb27b
commit 37de9c2ab0
6 changed files with 42 additions and 101 deletions

125
index.js
View File

@ -1,111 +1,46 @@
var assert = require('assert')
var util = require('util')
var Cursor = require('pg-cursor')
var Readable = require('stream').Readable
var path = require('path')
var pgdir = false
try {
pgdir = path.dirname(require.resolve('pg'))
} catch (e) {
pgdir = path.dirname(require.resolve('pg.js'))
if(!pgdir) {
throw new Error("Please install either `pg` or `pg.js` to use this module")
}
pgdir = path.join(pgdir, 'lib')
}
var Result = require(path.join(pgdir, 'result'))
var utils = require(path.join(pgdir, 'utils'))
var QueryStream = module.exports = function(text, values, options) {
options = options || { }
Cursor.call(this, text, values)
Readable.call(this, {
objectMode: true,
highWaterMark: options.highWaterMark || 1000
})
this.text = text
assert(this.text, 'text cannot be falsy')
this.values = (values || []).map(utils.prepareValue)
this.name = ''
this._result = new Result()
this.batchSize = options.batchSize || 100
this._idle = true
this._ready = false
//kick reader
this.read()
}
require('util').inherits(QueryStream, Readable)
QueryStream.prototype._read = function(n) {
this._getRows(n)
}
QueryStream.prototype._getRows = function(count) {
var con = this.connection
if(!this._idle || !this.connection) return;
this._idle = false
con.execute({
portal: '',
rows: count
}, true)
con.flush()
}
QueryStream.prototype.submit = function(con) {
//save reference to connection
this.connection = con
var name = this.name
con.parse({
text: this.text,
name: name,
types: []
}, true)
con.bind({
portal: '',
statement: name,
values: this.values,
binary: false
}, true)
con.describe({
type: 'P',
name: name
}, true)
this._getRows(this.batchSize)
}
QueryStream.prototype.handleRowDescription = function(msg) {
this._result.addFields(msg.fields)
}
QueryStream.prototype.handleDataRow = function(msg) {
var row = this._result.parseRow(msg.fields)
this._more = this.push(row)
}
QueryStream.prototype.handlePortalSuspended = function(msg) {
this._idle = true
if(this._more) {
this._getRows(this.batchSize)
util.inherits(QueryStream, Readable)
for(var key in Cursor.prototype) {
if(key != 'read') {
QueryStream.prototype[key] = Cursor.prototype[key]
}
}
QueryStream.prototype.handleCommandComplete = function(msg) {
this.connection.sync()
}
QueryStream.prototype._fetch = Cursor.prototype.read
QueryStream.prototype.handleReadyForQuery = function() {
this.push(null)
//ensure 'close' fires after end
setImmediate(function() {
this.emit('close')
}.bind(this))
}
QueryStream.prototype.handleError = function(err) {
this.connection.sync()
this.emit('error', err)
QueryStream.prototype._read = function(n) {
if(this._reading) return false;
this._reading = true
var self = this
this._fetch(this.batchSize, function(err, rows) {
if(err) {
return self.emit('error', err)
}
if(!rows.length) {
setImmediate(function() {
self.push(null)
self.once('end', self.emit.bind(self, 'close'))
})
}
self._reading = false
for(var i = 0; i < rows.length; i++) {
self.push(rows[i])
}
})
}

View File

@ -23,14 +23,16 @@
},
"devDependencies": {
"pg.js": "~2.8.0",
"gonna": "0.0.0",
"lodash": "~2.2.1",
"concat-stream": "~1.0.1",
"through": "~2.3.4",
"stream-tester": "0.0.5",
"stream-spec": "~0.3.5",
"jsonstream": "0.0.1",
"jsonstream": "*",
"JSONStream": "~0.7.1",
"mocha": "~1.17.1"
},
"dependencies": {
"pg-cursor": "~0.1.2"
}
}

View File

@ -7,8 +7,9 @@ var QueryStream = require('../')
require('./helper')(function(client) {
it('emits close', function(done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {chunkSize: 2, highWaterMark: 2})
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
query.pipe(concat(function() {}))
query.on('close', done)
})
})

View File

@ -7,10 +7,13 @@ helper(function(client) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
var query = client.query(stream)
var result = []
var count = 0
stream.on('readable', function() {
var res = stream.read()
assert(res, 'should not return null on evented reader')
result.push(res.num)
if(res) {
result.push(res.num)
}
})
stream.on('end', function() {
var total = result.reduce(function(prev, cur) {

View File

@ -1,4 +1,4 @@
var pg = require('pg')
var pg = require('pg.js')
module.exports = function(cb) {
describe('pg-query-stream', function() {
var client = new pg.Client()

View File

@ -7,7 +7,7 @@ var QueryStream = require('../')
require('./helper')(function(client) {
it('pauses', function(done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [200], {chunkSize: 2, highWaterMark: 2})
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [200], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
var pauser = tester.createPauseStream(0.1, 100)
query.pipe(JSONStream.stringify()).pipe(concat(function(json) {