commit 722296f8d2cccfc58536b51699ec741f2124e654 Author: Brian M. Carlson Date: Mon Oct 21 23:57:50 2013 -0500 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..3c3629e6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..fc9212a7 --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +SHELL := /bin/sh +.PHONY: test + +test: + find test/ -name "*.js" | xargs -n 1 node diff --git a/README.md b/README.md new file mode 100644 index 00000000..d16827a0 --- /dev/null +++ b/README.md @@ -0,0 +1,71 @@ +# pg-query-stream + +Receive result rows from [pg](https://github.com/brianc/node-postgres) as a readable (object) stream. + +This module __only works with the pure JavaScript client__. + +## installation + +```bash +$ npm install pg +$ npm install pg-query-stream +``` + +_requires pg>=2.8.1_ + +##### - or - + +```bash +$ npm install pg.js +$ npm install pg-query-stream +``` + +_requires pg.js>=2.8.1_ + +## use + +```js +var pg = require('pg') +var QueryStream = require('pg-query-stream') +var JSONStream = require('JSONStream') + +//pipe 1,000,000 rows to stdout without blowing up your memory usage +pg.connect(function(err, client, done) { + if(err) throw err; + var query = new QueryStream('SELECT * FROM generate_series(0, $1) num', [1000000]) + var stream = client.query(query) + stream.pipe(JSONStream.stringify()).pipe(process.stdout) +}) +``` + +The stream uses a cursor on the server so it efficiently keeps only a low number of rows in memory. + +This is especially useful when doing [ETL](http://en.wikipedia.org/wiki/Extract,_transform,_load) on a huge table. Using manual `limit` and `offset` queries to fake out async itteration through your data is cumbersom, and _way way way_ slower than using a cursor. + +## contribution + +I'm very open to contribution! Open a pull request with your code or idea and we'll talk about it. If it's not way insane we'll merge it in too: isn't open source awesome? + +## license + +The MIT License (MIT) + +Copyright (c) 2013 Brian M. Carlson + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/index.js b/index.js new file mode 100644 index 00000000..a8b20449 --- /dev/null +++ b/index.js @@ -0,0 +1,105 @@ +var assert = require('assert') +var Readable = require('stream').Readable +var Result = require('pg') + +var path = require('path') + +var pgdir = false +try { + pgdir = path.dirname(require.resolve('pg')) +} catch (e) { + pgdir = path.dirname(require.resolve('pg.js')) +} +if(!pgdir) { + throw new Error("Please install either `pg` or `pg.js` to use this module") +} +var Result = require(path.join(pgdir, 'result')) +var utils = require(path.join(pgdir, 'utils')) + +var QueryStream = module.exports = function(text, values, options) { + options = options || { + highWaterMark: 100, + batchSize: 100 + } + Readable.call(this, { + objectMode: true, + highWaterMark: 100 + }) + this.text = text + assert(this.text, 'text cannot be falsy') + this.values = (values || []).map(utils.prepareValue) + this.name = '' + this._result = new Result() + this.batchSize = 100 + this._idle = true +} + +require('util').inherits(QueryStream, Readable) + +QueryStream.prototype._read = function(n) { + this._getRows(n) +} + +QueryStream.prototype._getRows = function(count) { + var con = this.connection + if(!this._idle || !this.connection) return; + this._idle = false + con.execute({ + portal: '', + rows: count + }, true) + + con.flush() +} + +QueryStream.prototype.submit = function(con) { + //save reference to connection + this.connection = con + + var name = this.name + + con.parse({ + text: this.text, + name: name, + types: [] + }, true) + + con.bind({ + portal: '', + statement: name, + values: this.values, + binary: false + }, true) + + con.describe({ + type: 'P', + name: name + }, true) + + this._getRows(this.batchSize) + +} + +QueryStream.prototype.handleRowDescription = function(msg) { + this._result.addFields(msg.fields) +} + +QueryStream.prototype.handleDataRow = function(msg) { + var row = this._result.parseRow(msg.fields) + this._more = this.push(row) +} + +QueryStream.prototype.handlePortalSuspended = function(msg) { + this._idle = true + if(this._more) { + this._getRows(this.batchSize) + } +} + +QueryStream.prototype.handleCommandComplete = function(msg) { + this.connection.sync() +} + +QueryStream.prototype.handleReadyForQuery = function() { + this.push(null) +} diff --git a/package.json b/package.json new file mode 100644 index 00000000..0a2b3473 --- /dev/null +++ b/package.json @@ -0,0 +1,35 @@ +{ + "name": "pg-query-stream", + "version": "0.0.0", + "description": "Postgres query result returned as readable stream", + "main": "index.js", + "scripts": { + "test": "node test" + }, + "repository": { + "type": "git", + "url": "git://github.com/brianc/node-pg-query-stream.git" + }, + "keywords": [ + "postgres", + "pg", + "query", + "stream" + ], + "author": "Brian M. Carlson", + "license": "BSD-2-Clause", + "bugs": { + "url": "https://github.com/brianc/node-pg-query-stream/issues" + }, + "devDependencies": { + "pg.js": "~2.8.0", + "gonna": "0.0.0", + "lodash": "~2.2.1", + "concat-stream": "~1.0.1", + "through": "~2.3.4", + "stream-tester": "0.0.5", + "stream-spec": "~0.3.5", + "jsonstream": "0.0.1", + "JSONStream": "~0.7.1" + } +} diff --git a/test/concat.js b/test/concat.js new file mode 100644 index 00000000..b97ea4c1 --- /dev/null +++ b/test/concat.js @@ -0,0 +1,26 @@ +var pg = require('pg') +var assert = require('assert') +var gonna = require('gonna') +var _ = require('lodash') +var concat = require('concat-stream') +var through = require('through') + +var QueryStream = require('../') + +var client = new pg.Client() + +var connected = gonna('connect', 100, function() { + var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', []) + var query = client.query(stream) + query.pipe(through(function(row) { + this.push(row.num) + })).pipe(concat(function(result) { + var total = result.reduce(function(prev, cur) { + return prev + cur + }) + assert.equal(total, 20100) + })) + stream.on('end', client.end.bind(client)) +}) + +client.connect(connected) diff --git a/test/fast-reader.js b/test/fast-reader.js new file mode 100644 index 00000000..629848fb --- /dev/null +++ b/test/fast-reader.js @@ -0,0 +1,29 @@ +var pg = require('pg') +var assert = require('assert') +var gonna = require('gonna') +var _ = require('lodash') + +var QueryStream = require('../') + +var client = new pg.Client() + +var connected = gonna('connect', 100, function() { + var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', []) + var query = client.query(stream) + var result = [] + stream.on('readable', function() { + var res = stream.read() + assert(res, 'should not return null on evented reader') + result.push(res.num) + }) + stream.on('end', client.end.bind(client)) + stream.on('end', function() { + var total = result.reduce(function(prev, cur) { + return prev + cur + }) + assert.equal(total, 20100) + }) + assert.strictEqual(query.read(2), null) +}) + +client.connect(connected) diff --git a/test/pauses.js b/test/pauses.js new file mode 100644 index 00000000..7c61c8e8 --- /dev/null +++ b/test/pauses.js @@ -0,0 +1,25 @@ +var pg = require('pg') +var assert = require('assert') +var gonna = require('gonna') +var _ = require('lodash') +var concat = require('concat-stream') +var through = require('through') +var tester = require('stream-tester') +var JSONStream = require('JSONStream') +var stream = require('stream') + +var QueryStream = require('../') + +var client = new pg.Client() + +var connected = gonna('connect', 100, function() { + var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [200], {chunkSize: 2, highWaterMark: 2}) + var query = client.query(stream) + var pauser = tester.createPauseStream(0.1, 100) + query.pipe(JSONStream.stringify()).pipe(concat(function(json) { + JSON.parse(json) + client.end() + })) +}) + +client.connect(connected) diff --git a/test/stream-tester.js b/test/stream-tester.js new file mode 100644 index 00000000..6975247a --- /dev/null +++ b/test/stream-tester.js @@ -0,0 +1,21 @@ +var pg = require('pg') +var assert = require('assert') +var gonna = require('gonna') +var tester = require('stream-tester') + +var QueryStream = require('../') + +var client = new pg.Client() + +var connected = gonna('connect', 100, function() { + var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', []) + var spec = require('stream-spec') + var query = client.query(stream) + spec(query) + .readable() + .pausable({strict: true}) + .validateOnExit() + stream.on('end', client.end.bind(client)) +}) + +client.connect(connected)