commit 5d27cf24e2e87704a61deade07122a722b39ede6 Author: Brian Carlson Date: Mon Oct 7 11:52:36 2013 -0500 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..3c3629e6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/index.js b/index.js new file mode 100644 index 00000000..8960a352 --- /dev/null +++ b/index.js @@ -0,0 +1,116 @@ +var path = require('path') + +var resultPath = path.dirname(require.resolve('pg.js')) + '/lib/result' +var Result = require(resultPath) +var Client = require('pg.js').Client + +var Cursor = function(text, values) { + this.text = text + this.values = values + this._connection = null +} + +Cursor.prototype._connect = function(cb) { + if(this._connected) return setImmediate(cb); + this._connected = true + var self = this + var client = new Client() + client.connect(function(err) { + if(err) return cb(err); + + //remove all listeners from + //client's connection and discard the client + self.connection = client.connection + self.connection.removeAllListeners() + + var con = self.connection + + con.parse({ + text: self.text + }, true) + + con.bind({ + values: self.values + }, true) + + con.describe({ + type: 'P', + name: '' //use unamed portal + }, true) + + con.flush() + + var onError = function(err) { + cb(err) + con.end() + } + + con.once('error', onError) + + con.on('rowDescription', function(msg) { + self.rowDescription = msg + con.removeListener('error', onError) + cb(null, con) + }) + + var onRow = function(msg) { + var row = self.result.parseRow(msg.fields) + self.result.addRow(row) + } + + con.on('dataRow', onRow) + + con.once('readyForQuery', function() { + con.end() + }) + + con.once('commandComplete', function() { + self._complete = true + con.sync() + }) + }) +} + +Cursor.prototype._getRows = function(con, n, cb) { + if(this._done) { + return cb(null, [], false) + } + var msg = { + portal: '', + rows: n + } + con.execute(msg, true) + con.flush() + this.result = new Result() + this.result.addFields(this.rowDescription.fields) + + var self = this + + var onComplete = function() { + self._done = true + cb(null, self.result.rows, self.result) + } + con.once('commandComplete', onComplete) + + con.once('portalSuspended', function() { + cb(null, self.result.rows, self.result) + con.removeListener('commandComplete', onComplete) + }) +} + +Cursor.prototype.end = function(cb) { + this.connection.end() + this.connection.stream.once('end', cb) +} + +Cursor.prototype.read = function(rows, cb) { + var self = this + this._connect(function(err) { + if(err) return cb(err); + self._getRows(self.connection, rows, cb) + }) +} + +module.exports = function(query, params) { + return new Cursor(query, params) +} diff --git a/package.json b/package.json new file mode 100644 index 00000000..993f57a0 --- /dev/null +++ b/package.json @@ -0,0 +1,20 @@ +{ + "name": "node-pg-cursor", + "version": "0.0.0", + "description": "", + "main": "index.js", + "directories": { + "test": "test" + }, + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "", + "license": "BSD", + "devDependencies": { + "gonna": "0.0.0" + }, + "dependencies": { + "pg.js": "~2.7.0" + } +} diff --git a/test/index.js b/test/index.js new file mode 100644 index 00000000..4d45a85c --- /dev/null +++ b/test/index.js @@ -0,0 +1,94 @@ +var assert = require('assert') +var pgCursor = require('../') +var gonna = require('gonna') + +var text = 'SELECT generate_series as num FROM generate_series(0, 5)' +var values = [] + +var test = function(name, fn, timeout) { + timeout = timeout || 1000 + var done = gonna(name, timeout, function(err) { + console.log(name) + assert.ifError(err) + }) + fn(done) +} + +test('fetch 6 when asking for 10', function(done) { + var cursor = pgCursor(text) + cursor.read(10, function(err, res) { + assert.ifError(err) + assert.equal(res.length, 6) + done() + }) +}) + +test('end before reading to end', function(done) { + var cursor = pgCursor(text) + cursor.read(3, function(err, res) { + assert.equal(res.length, 3) + cursor.end(done) + }) +}) + +test('callback with error', function(done) { + var cursor = pgCursor('select asdfasdf') + cursor.read(1, function(err) { + assert(err) + done() + }) +}) + + +test('read a partial chunk of data', function(done) { + var cursor = pgCursor(text) + cursor.read(2, function(err, res) { + assert.equal(res.length, 2) + cursor.read(3, function(err, res) { + assert.equal(res.length, 3) + cursor.read(1, function(err, res) { + assert.equal(res.length, 1) + cursor.read(1, function(err, res) { + assert.ifError(err) + assert.strictEqual(res.length, 0) + done() + }) + }) + }) + }) +}) + +test('read return length 0 past the end', function(done) { + var cursor = pgCursor(text) + cursor.read(2, function(err, res) { + cursor.read(100, function(err, res) { + assert.equal(res.length, 4) + cursor.read(100, function(err, res) { + assert.equal(res.length, 0) + done() + }) + }) + }) +}) + +test('read huge result', function(done) { + var text = 'SELECT generate_series as num FROM generate_series(0, 1000000)' + var values = [] + cursor = pgCursor(text, values); + var count = 0; + var more = function() { + cursor.read(1000, function(err, rows) { + if(err) return done(err); + if(!rows.length) { + assert.equal(count, 1000001) + return done() + } + count += rows.length; + if(count%100000 == 0) { + console.log(count) + } + setImmediate(more) + }) + } + more() +}, 100000)