node-postgres/index.js
Stephen Sugden e1117155ae Emit 'close' events when query completes
Consider a system where one component is scheduling tasks that yield
streams, and passing them to (unknown) clients for consumption.

It would be useful for the scheduler to know that the query
underlying the stream is completed (so it can continue on to it's
next task) without having to wait for the consumer to finish reading
all results.
2013-12-25 13:33:34 -08:00

109 lines
2.3 KiB
JavaScript

var assert = require('assert')
var Readable = require('stream').Readable
var path = require('path')
var pgdir = false
try {
pgdir = path.dirname(require.resolve('pg'))
} catch (e) {
pgdir = path.dirname(require.resolve('pg.js'))
if(!pgdir) {
throw new Error("Please install either `pg` or `pg.js` to use this module")
}
pgdir = path.join(pgdir, 'lib')
}
var Result = require(path.join(pgdir, 'result'))
var utils = require(path.join(pgdir, 'utils'))
var QueryStream = module.exports = function(text, values, options) {
options = options || { }
Readable.call(this, {
objectMode: true,
highWaterMark: options.highWaterMark || 1000
})
this.text = text
assert(this.text, 'text cannot be falsy')
this.values = (values || []).map(utils.prepareValue)
this.name = ''
this._result = new Result()
this.batchSize = options.batchSize || 100
this._idle = true
}
require('util').inherits(QueryStream, Readable)
QueryStream.prototype._read = function(n) {
this._getRows(n)
}
QueryStream.prototype._getRows = function(count) {
var con = this.connection
if(!this._idle || !this.connection) return;
this._idle = false
con.execute({
portal: '',
rows: count
}, true)
con.flush()
}
QueryStream.prototype.submit = function(con) {
//save reference to connection
this.connection = con
var name = this.name
con.parse({
text: this.text,
name: name,
types: []
}, true)
con.bind({
portal: '',
statement: name,
values: this.values,
binary: false
}, true)
con.describe({
type: 'P',
name: name
}, true)
this._getRows(this.batchSize)
}
QueryStream.prototype.handleRowDescription = function(msg) {
this._result.addFields(msg.fields)
}
QueryStream.prototype.handleDataRow = function(msg) {
var row = this._result.parseRow(msg.fields)
this._more = this.push(row)
}
QueryStream.prototype.handlePortalSuspended = function(msg) {
this._idle = true
if(this._more) {
this._getRows(this.batchSize)
}
}
QueryStream.prototype.handleCommandComplete = function(msg) {
this.connection.sync()
}
QueryStream.prototype.handleReadyForQuery = function() {
this.emit('close')
this.push(null)
}
QueryStream.prototype.handleError = function(err) {
this.connection.sync()
this.emit('error', err)
}