mirror of
https://github.com/brianc/node-postgres.git
synced 2025-12-08 20:16:25 +00:00
Add 'packages/pg-query-stream/' from commit '9ced05e8aab65f3fdf1a67add87bfc9035e487e8'
git-subtree-dir: packages/pg-query-stream git-subtree-mainline: cccf84e14b3281b753e1baab7bc194aaac5024a8 git-subtree-split: 9ced05e8aab65f3fdf1a67add87bfc9035e487e8
This commit is contained in:
commit
db1b95e5f3
9
packages/pg-query-stream/.eslintrc
Normal file
9
packages/pg-query-stream/.eslintrc
Normal file
@ -0,0 +1,9 @@
|
||||
{
|
||||
"extends": "standard",
|
||||
"env": {
|
||||
"mocha": true
|
||||
},
|
||||
"rules": {
|
||||
"no-new-func": "off"
|
||||
}
|
||||
}
|
||||
1
packages/pg-query-stream/.gitignore
vendored
Normal file
1
packages/pg-query-stream/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
node_modules
|
||||
14
packages/pg-query-stream/.travis.yml
Normal file
14
packages/pg-query-stream/.travis.yml
Normal file
@ -0,0 +1,14 @@
|
||||
language: node_js
|
||||
dist: trusty
|
||||
node_js:
|
||||
- "8"
|
||||
- "10"
|
||||
- "12"
|
||||
env:
|
||||
- PGUSER=postgres
|
||||
services:
|
||||
- postgresql
|
||||
addons:
|
||||
postgresql: "9.6"
|
||||
before_script:
|
||||
- psql -c 'create database travis;' -U postgres | true
|
||||
9
packages/pg-query-stream/LICENSE
Normal file
9
packages/pg-query-stream/LICENSE
Normal file
@ -0,0 +1,9 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2013 Brian M. Carlson
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
14
packages/pg-query-stream/Makefile
Normal file
14
packages/pg-query-stream/Makefile
Normal file
@ -0,0 +1,14 @@
|
||||
.PHONY: publish-patch test
|
||||
|
||||
test:
|
||||
npm test
|
||||
|
||||
patch: test
|
||||
npm version patch -m "Bump version"
|
||||
git push origin master --tags
|
||||
npm publish
|
||||
|
||||
minor: test
|
||||
npm version minor -m "Bump version"
|
||||
git push origin master --tags
|
||||
npm publish
|
||||
68
packages/pg-query-stream/README.md
Normal file
68
packages/pg-query-stream/README.md
Normal file
@ -0,0 +1,68 @@
|
||||
# pg-query-stream
|
||||
|
||||
[](https://travis-ci.org/brianc/node-pg-query-stream)
|
||||
|
||||
Receive result rows from [pg](https://github.com/brianc/node-postgres) as a readable (object) stream.
|
||||
|
||||
|
||||
## installation
|
||||
|
||||
```bash
|
||||
$ npm install pg --save
|
||||
$ npm install pg-query-stream --save
|
||||
```
|
||||
|
||||
_requires pg>=2.8.1_
|
||||
|
||||
|
||||
## use
|
||||
|
||||
```js
|
||||
const pg = require('pg')
|
||||
const QueryStream = require('pg-query-stream')
|
||||
const JSONStream = require('JSONStream')
|
||||
|
||||
//pipe 1,000,000 rows to stdout without blowing up your memory usage
|
||||
pg.connect((err, client, done) => {
|
||||
if (err) throw err;
|
||||
const query = new QueryStream('SELECT * FROM generate_series(0, $1) num', [1000000])
|
||||
const stream = client.query(query)
|
||||
//release the client when the stream is finished
|
||||
stream.on('end', done)
|
||||
stream.pipe(JSONStream.stringify()).pipe(process.stdout)
|
||||
})
|
||||
```
|
||||
|
||||
The stream uses a cursor on the server so it efficiently keeps only a low number of rows in memory.
|
||||
|
||||
This is especially useful when doing [ETL](http://en.wikipedia.org/wiki/Extract,_transform,_load) on a huge table. Using manual `limit` and `offset` queries to fake out async itteration through your data is cumbersome, and _way way way_ slower than using a cursor.
|
||||
|
||||
_note: this module only works with the JavaScript client, and does not work with the native bindings. libpq doesn't expose the protocol at a level where a cursor can be manipulated directly_
|
||||
|
||||
## contribution
|
||||
|
||||
I'm very open to contribution! Open a pull request with your code or idea and we'll talk about it. If it's not way insane we'll merge it in too: isn't open source awesome?
|
||||
|
||||
## license
|
||||
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2013 Brian M. Carlson
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
61
packages/pg-query-stream/index.js
Normal file
61
packages/pg-query-stream/index.js
Normal file
@ -0,0 +1,61 @@
|
||||
'use strict'
|
||||
var Cursor = require('pg-cursor')
|
||||
var Readable = require('stream').Readable
|
||||
|
||||
class PgQueryStream extends Readable {
|
||||
constructor (text, values, options) {
|
||||
super(Object.assign({ objectMode: true }, options))
|
||||
this.cursor = new Cursor(text, values, options)
|
||||
this._reading = false
|
||||
this._closed = false
|
||||
this.batchSize = (options || {}).batchSize || 100
|
||||
|
||||
// delegate Submittable callbacks to cursor
|
||||
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
|
||||
this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor)
|
||||
this.handlePortalSuspended = this.cursor.handlePortalSuspended.bind(this.cursor)
|
||||
this.handleCommandComplete = this.cursor.handleCommandComplete.bind(this.cursor)
|
||||
this.handleReadyForQuery = this.cursor.handleReadyForQuery.bind(this.cursor)
|
||||
this.handleError = this.cursor.handleError.bind(this.cursor)
|
||||
}
|
||||
|
||||
submit (connection) {
|
||||
this.cursor.submit(connection)
|
||||
}
|
||||
|
||||
close (callback) {
|
||||
this._closed = true
|
||||
const cb = callback || (() => this.emit('close'))
|
||||
this.cursor.close(cb)
|
||||
}
|
||||
|
||||
_read (size) {
|
||||
if (this._reading || this._closed) {
|
||||
return false
|
||||
}
|
||||
this._reading = true
|
||||
const readAmount = Math.max(size, this.batchSize)
|
||||
this.cursor.read(readAmount, (err, rows) => {
|
||||
if (this._closed) {
|
||||
return
|
||||
}
|
||||
if (err) {
|
||||
return this.emit('error', err)
|
||||
}
|
||||
// if we get a 0 length array we've read to the end of the cursor
|
||||
if (!rows.length) {
|
||||
this._closed = true
|
||||
setImmediate(() => this.emit('close'))
|
||||
return this.push(null)
|
||||
}
|
||||
|
||||
// push each row into the stream
|
||||
this._reading = false
|
||||
for (var i = 0; i < rows.length; i++) {
|
||||
this.push(rows[i])
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = PgQueryStream
|
||||
49
packages/pg-query-stream/package.json
Normal file
49
packages/pg-query-stream/package.json
Normal file
@ -0,0 +1,49 @@
|
||||
{
|
||||
"name": "pg-query-stream",
|
||||
"version": "2.0.1",
|
||||
"description": "Postgres query result returned as readable stream",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
"test": "mocha"
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "git://github.com/brianc/node-pg-query-stream.git"
|
||||
},
|
||||
"keywords": [
|
||||
"postgres",
|
||||
"pg",
|
||||
"query",
|
||||
"stream"
|
||||
],
|
||||
"author": "Brian M. Carlson",
|
||||
"license": "MIT",
|
||||
"bugs": {
|
||||
"url": "https://github.com/brianc/node-pg-query-stream/issues"
|
||||
},
|
||||
"devDependencies": {
|
||||
"JSONStream": "~0.7.1",
|
||||
"concat-stream": "~1.0.1",
|
||||
"eslint": "^4.4.0",
|
||||
"eslint-config-standard": "^10.2.1",
|
||||
"eslint-plugin-import": "^2.7.0",
|
||||
"eslint-plugin-node": "^5.1.1",
|
||||
"eslint-plugin-promise": "^3.5.0",
|
||||
"eslint-plugin-standard": "^3.0.1",
|
||||
"mocha": "^6.2.2",
|
||||
"pg": "^7.5.0",
|
||||
"prettier": "^1.18.2",
|
||||
"stream-spec": "~0.3.5",
|
||||
"stream-tester": "0.0.5",
|
||||
"through": "~2.3.4"
|
||||
},
|
||||
"prettier": {
|
||||
"semi": false,
|
||||
"printWidth": 120,
|
||||
"trailingComma": "es5",
|
||||
"singleQuote": true
|
||||
},
|
||||
"dependencies": {
|
||||
"pg-cursor": "^2.0.1"
|
||||
}
|
||||
}
|
||||
57
packages/pg-query-stream/test/async-iterator.es6
Normal file
57
packages/pg-query-stream/test/async-iterator.es6
Normal file
@ -0,0 +1,57 @@
|
||||
const QueryStream = require('../')
|
||||
const pg = require('pg')
|
||||
const assert = require('assert')
|
||||
|
||||
const queryText = 'SELECT * FROM generate_series(0, 200) num'
|
||||
describe('Async iterator', () => {
|
||||
it('works', async () => {
|
||||
const stream = new QueryStream(queryText, [])
|
||||
const client = new pg.Client()
|
||||
await client.connect()
|
||||
const query = client.query(stream)
|
||||
const rows = []
|
||||
for await (const row of query) {
|
||||
rows.push(row)
|
||||
}
|
||||
assert.equal(rows.length, 201)
|
||||
await client.end()
|
||||
})
|
||||
|
||||
it('can async iterate and then do a query afterwards', async () => {
|
||||
const stream = new QueryStream(queryText, [])
|
||||
const client = new pg.Client()
|
||||
await client.connect()
|
||||
const query = client.query(stream)
|
||||
const iteratorRows = []
|
||||
for await (const row of query) {
|
||||
iteratorRows.push(row)
|
||||
}
|
||||
assert.equal(iteratorRows.length, 201)
|
||||
const { rows } = await client.query('SELECT NOW()')
|
||||
assert.equal(rows.length, 1)
|
||||
await client.end()
|
||||
})
|
||||
|
||||
it('can async iterate multiple times with a pool', async () => {
|
||||
const pool = new pg.Pool({ max: 1 })
|
||||
|
||||
const allRows = []
|
||||
const run = async () => {
|
||||
// get the client
|
||||
const client = await pool.connect()
|
||||
// stream some rows
|
||||
const stream = new QueryStream(queryText, [])
|
||||
const iteratorRows = []
|
||||
client.query(stream)
|
||||
for await (const row of stream) {
|
||||
iteratorRows.push(row)
|
||||
allRows.push(row)
|
||||
}
|
||||
assert.equal(iteratorRows.length, 201)
|
||||
client.release()
|
||||
}
|
||||
await Promise.all([run(), run(), run()])
|
||||
assert.equal(allRows.length, 603)
|
||||
await pool.end()
|
||||
})
|
||||
})
|
||||
4
packages/pg-query-stream/test/async-iterator.js
Normal file
4
packages/pg-query-stream/test/async-iterator.js
Normal file
@ -0,0 +1,4 @@
|
||||
// only newer versions of node support async iterator
|
||||
if (!process.version.startsWith('v8')) {
|
||||
require('./async-iterator.es6')
|
||||
}
|
||||
52
packages/pg-query-stream/test/close.js
Normal file
52
packages/pg-query-stream/test/close.js
Normal file
@ -0,0 +1,52 @@
|
||||
var assert = require('assert')
|
||||
var concat = require('concat-stream')
|
||||
|
||||
var QueryStream = require('../')
|
||||
var helper = require('./helper')
|
||||
|
||||
helper('close', function (client) {
|
||||
it('emits close', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2})
|
||||
var query = client.query(stream)
|
||||
query.pipe(concat(function () {}))
|
||||
query.on('close', done)
|
||||
})
|
||||
})
|
||||
|
||||
helper('early close', function (client) {
|
||||
it('can be closed early', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2})
|
||||
var query = client.query(stream)
|
||||
var readCount = 0
|
||||
query.on('readable', function () {
|
||||
readCount++
|
||||
query.read()
|
||||
})
|
||||
query.once('readable', function () {
|
||||
query.close()
|
||||
})
|
||||
query.on('close', function () {
|
||||
assert(readCount < 10, 'should not have read more than 10 rows')
|
||||
done()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
helper('close callback', function (client) {
|
||||
it('notifies an optional callback when the conneciton is closed', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2})
|
||||
var query = client.query(stream)
|
||||
query.once('readable', function () { // only reading once
|
||||
query.read()
|
||||
})
|
||||
query.once('readable', function () {
|
||||
query.close(function () {
|
||||
// nothing to assert. This test will time out if the callback does not work.
|
||||
done()
|
||||
})
|
||||
})
|
||||
query.on('close', function () {
|
||||
assert(false, 'close event should not fire') // no close event because we did not read to the end of the stream.
|
||||
})
|
||||
})
|
||||
})
|
||||
22
packages/pg-query-stream/test/concat.js
Normal file
22
packages/pg-query-stream/test/concat.js
Normal file
@ -0,0 +1,22 @@
|
||||
var assert = require('assert')
|
||||
var concat = require('concat-stream')
|
||||
var through = require('through')
|
||||
var helper = require('./helper')
|
||||
|
||||
var QueryStream = require('../')
|
||||
|
||||
helper('concat', function (client) {
|
||||
it('concats correctly', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
|
||||
var query = client.query(stream)
|
||||
query.pipe(through(function (row) {
|
||||
this.push(row.num)
|
||||
})).pipe(concat(function (result) {
|
||||
var total = result.reduce(function (prev, cur) {
|
||||
return prev + cur
|
||||
})
|
||||
assert.equal(total, 20100)
|
||||
}))
|
||||
stream.on('end', done)
|
||||
})
|
||||
})
|
||||
10
packages/pg-query-stream/test/config.js
Normal file
10
packages/pg-query-stream/test/config.js
Normal file
@ -0,0 +1,10 @@
|
||||
var assert = require('assert')
|
||||
var QueryStream = require('../')
|
||||
|
||||
var stream = new QueryStream('SELECT NOW()', [], {
|
||||
highWaterMark: 999,
|
||||
batchSize: 88
|
||||
})
|
||||
|
||||
assert.equal(stream._readableState.highWaterMark, 999)
|
||||
assert.equal(stream.batchSize, 88)
|
||||
22
packages/pg-query-stream/test/error.js
Normal file
22
packages/pg-query-stream/test/error.js
Normal file
@ -0,0 +1,22 @@
|
||||
var assert = require('assert')
|
||||
var helper = require('./helper')
|
||||
|
||||
var QueryStream = require('../')
|
||||
|
||||
helper('error', function (client) {
|
||||
it('receives error on stream', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM asdf num', [])
|
||||
var query = client.query(stream)
|
||||
query.on('error', function (err) {
|
||||
assert(err)
|
||||
assert.equal(err.code, '42P01')
|
||||
done()
|
||||
}).on('data', function () {
|
||||
// noop to kick of reading
|
||||
})
|
||||
})
|
||||
|
||||
it('continues to function after stream', function (done) {
|
||||
client.query('SELECT NOW()', done)
|
||||
})
|
||||
})
|
||||
35
packages/pg-query-stream/test/fast-reader.js
Normal file
35
packages/pg-query-stream/test/fast-reader.js
Normal file
@ -0,0 +1,35 @@
|
||||
var assert = require('assert')
|
||||
var helper = require('./helper')
|
||||
var QueryStream = require('../')
|
||||
|
||||
helper('fast reader', function (client) {
|
||||
it('works', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
|
||||
var query = client.query(stream)
|
||||
var result = []
|
||||
stream.on('readable', function () {
|
||||
var res = stream.read()
|
||||
while (res) {
|
||||
if (result.length !== 201) {
|
||||
assert(res, 'should not return null on evented reader')
|
||||
} else {
|
||||
// a readable stream will emit a null datum when it finishes being readable
|
||||
// https://nodejs.org/api/stream.html#stream_event_readable
|
||||
assert.equal(res, null)
|
||||
}
|
||||
if (res) {
|
||||
result.push(res.num)
|
||||
}
|
||||
res = stream.read()
|
||||
}
|
||||
})
|
||||
stream.on('end', function () {
|
||||
var total = result.reduce(function (prev, cur) {
|
||||
return prev + cur
|
||||
})
|
||||
assert.equal(total, 20100)
|
||||
done()
|
||||
})
|
||||
assert.strictEqual(query.read(2), null)
|
||||
})
|
||||
})
|
||||
17
packages/pg-query-stream/test/helper.js
Normal file
17
packages/pg-query-stream/test/helper.js
Normal file
@ -0,0 +1,17 @@
|
||||
var pg = require('pg')
|
||||
module.exports = function (name, cb) {
|
||||
describe(name, function () {
|
||||
var client = new pg.Client()
|
||||
|
||||
before(function (done) {
|
||||
client.connect(done)
|
||||
})
|
||||
|
||||
cb(client)
|
||||
|
||||
after(function (done) {
|
||||
client.end()
|
||||
client.on('end', done)
|
||||
})
|
||||
})
|
||||
}
|
||||
15
packages/pg-query-stream/test/instant.js
Normal file
15
packages/pg-query-stream/test/instant.js
Normal file
@ -0,0 +1,15 @@
|
||||
var assert = require('assert')
|
||||
var concat = require('concat-stream')
|
||||
|
||||
var QueryStream = require('../')
|
||||
|
||||
require('./helper')('instant', function (client) {
|
||||
it('instant', function (done) {
|
||||
var query = new QueryStream('SELECT pg_sleep(1)', [])
|
||||
var stream = client.query(query)
|
||||
stream.pipe(concat(function (res) {
|
||||
assert.equal(res.length, 1)
|
||||
done()
|
||||
}))
|
||||
})
|
||||
})
|
||||
32
packages/pg-query-stream/test/issue-3.js
Normal file
32
packages/pg-query-stream/test/issue-3.js
Normal file
@ -0,0 +1,32 @@
|
||||
var pg = require('pg')
|
||||
var QueryStream = require('../')
|
||||
describe('end semantics race condition', function () {
|
||||
before(function (done) {
|
||||
var client = new pg.Client()
|
||||
client.connect()
|
||||
client.on('drain', client.end.bind(client))
|
||||
client.on('end', done)
|
||||
client.query('create table IF NOT EXISTS p(id serial primary key)')
|
||||
client.query('create table IF NOT EXISTS c(id int primary key references p)')
|
||||
})
|
||||
it('works', function (done) {
|
||||
var client1 = new pg.Client()
|
||||
client1.connect()
|
||||
var client2 = new pg.Client()
|
||||
client2.connect()
|
||||
|
||||
var qr = new QueryStream('INSERT INTO p DEFAULT VALUES RETURNING id')
|
||||
client1.query(qr)
|
||||
var id = null
|
||||
qr.on('data', function (row) {
|
||||
id = row.id
|
||||
})
|
||||
qr.on('end', function () {
|
||||
client2.query('INSERT INTO c(id) VALUES ($1)', [id], function (err, rows) {
|
||||
client1.end()
|
||||
client2.end()
|
||||
done(err)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
1
packages/pg-query-stream/test/mocha.opts
Normal file
1
packages/pg-query-stream/test/mocha.opts
Normal file
@ -0,0 +1 @@
|
||||
--bail
|
||||
38
packages/pg-query-stream/test/passing-options.js
Normal file
38
packages/pg-query-stream/test/passing-options.js
Normal file
@ -0,0 +1,38 @@
|
||||
var assert = require('assert')
|
||||
var helper = require('./helper')
|
||||
var QueryStream = require('../')
|
||||
|
||||
helper('passing options', function(client) {
|
||||
it('passes row mode array', function(done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 10) num', [], { rowMode: 'array' })
|
||||
var query = client.query(stream)
|
||||
var result = []
|
||||
query.on('data', datum => {
|
||||
result.push(datum)
|
||||
})
|
||||
query.on('end', () => {
|
||||
const expected = new Array(11).fill(0).map((_, i) => [i])
|
||||
assert.deepEqual(result, expected)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('passes custom types', function(done) {
|
||||
const types = {
|
||||
getTypeParser: () => string => string,
|
||||
}
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 10) num', [], { types })
|
||||
var query = client.query(stream)
|
||||
var result = []
|
||||
query.on('data', datum => {
|
||||
result.push(datum)
|
||||
})
|
||||
query.on('end', () => {
|
||||
const expected = new Array(11).fill(0).map((_, i) => ({
|
||||
num: i.toString(),
|
||||
}))
|
||||
assert.deepEqual(result, expected)
|
||||
done()
|
||||
})
|
||||
})
|
||||
})
|
||||
18
packages/pg-query-stream/test/pauses.js
Normal file
18
packages/pg-query-stream/test/pauses.js
Normal file
@ -0,0 +1,18 @@
|
||||
var concat = require('concat-stream')
|
||||
var tester = require('stream-tester')
|
||||
var JSONStream = require('JSONStream')
|
||||
|
||||
var QueryStream = require('../')
|
||||
|
||||
require('./helper')('pauses', function (client) {
|
||||
it('pauses', function (done) {
|
||||
this.timeout(5000)
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [200], {batchSize: 2, highWaterMark: 2})
|
||||
var query = client.query(stream)
|
||||
var pauser = tester.createPauseStream(0.1, 100)
|
||||
query.pipe(JSONStream.stringify()).pipe(pauser).pipe(concat(function (json) {
|
||||
JSON.parse(json)
|
||||
done()
|
||||
}))
|
||||
})
|
||||
})
|
||||
26
packages/pg-query-stream/test/slow-reader.js
Normal file
26
packages/pg-query-stream/test/slow-reader.js
Normal file
@ -0,0 +1,26 @@
|
||||
var helper = require('./helper')
|
||||
var QueryStream = require('../')
|
||||
var concat = require('concat-stream')
|
||||
|
||||
var Transform = require('stream').Transform
|
||||
|
||||
var mapper = new Transform({objectMode: true})
|
||||
|
||||
mapper._transform = function (obj, enc, cb) {
|
||||
this.push(obj)
|
||||
setTimeout(cb, 5)
|
||||
}
|
||||
|
||||
helper('slow reader', function (client) {
|
||||
it('works', function (done) {
|
||||
this.timeout(50000)
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 201) num', [], {highWaterMark: 100, batchSize: 50})
|
||||
stream.on('end', function () {
|
||||
// console.log('stream end')
|
||||
})
|
||||
client.query(stream)
|
||||
stream.pipe(mapper).pipe(concat(function (res) {
|
||||
done()
|
||||
}))
|
||||
})
|
||||
})
|
||||
26
packages/pg-query-stream/test/stream-tester-timestamp.js
Normal file
26
packages/pg-query-stream/test/stream-tester-timestamp.js
Normal file
@ -0,0 +1,26 @@
|
||||
var QueryStream = require('../')
|
||||
var spec = require('stream-spec')
|
||||
var assert = require('assert')
|
||||
|
||||
require('./helper')('stream tester timestamp', function (client) {
|
||||
it('should not warn about max listeners', function (done) {
|
||||
var sql = 'SELECT * FROM generate_series(\'1983-12-30 00:00\'::timestamp, \'2013-12-30 00:00\', \'1 years\')'
|
||||
var stream = new QueryStream(sql, [])
|
||||
var ended = false
|
||||
var query = client.query(stream)
|
||||
query.on('end', function () { ended = true })
|
||||
spec(query)
|
||||
.readable()
|
||||
.pausable({ strict: true })
|
||||
.validateOnExit()
|
||||
var checkListeners = function () {
|
||||
assert(stream.listeners('end').length < 10)
|
||||
if (!ended) {
|
||||
setImmediate(checkListeners)
|
||||
} else {
|
||||
done()
|
||||
}
|
||||
}
|
||||
checkListeners()
|
||||
})
|
||||
})
|
||||
15
packages/pg-query-stream/test/stream-tester.js
Normal file
15
packages/pg-query-stream/test/stream-tester.js
Normal file
@ -0,0 +1,15 @@
|
||||
var spec = require('stream-spec')
|
||||
|
||||
var QueryStream = require('../')
|
||||
|
||||
require('./helper')('stream tester', function (client) {
|
||||
it('passes stream spec', function (done) {
|
||||
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
|
||||
var query = client.query(stream)
|
||||
spec(query)
|
||||
.readable()
|
||||
.pausable({strict: true})
|
||||
.validateOnExit()
|
||||
stream.on('end', done)
|
||||
})
|
||||
})
|
||||
1627
packages/pg-query-stream/yarn.lock
Normal file
1627
packages/pg-query-stream/yarn.lock
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user