mirror of
https://github.com/brianc/node-postgres.git
synced 2026-01-18 15:55:05 +00:00
Add 'packages/pg-cursor/' from commit '492fbdbb65f6f33396d1017fa4cdbbb247dd3895'
git-subtree-dir: packages/pg-cursor git-subtree-mainline: ebb81dbfa635eca73d16d54b501f04c8d843bac5 git-subtree-split: 492fbdbb65f6f33396d1017fa4cdbbb247dd3895
This commit is contained in:
commit
37d15740ed
17
packages/pg-cursor/.eslintrc
Normal file
17
packages/pg-cursor/.eslintrc
Normal file
@ -0,0 +1,17 @@
|
||||
{
|
||||
"extends": ["eslint:recommended"],
|
||||
"parserOptions": {
|
||||
"ecmaVersion": 2017
|
||||
},
|
||||
"plugins": ["prettier"],
|
||||
"rules": {
|
||||
"prettier/prettier": "error",
|
||||
"prefer-const": "error",
|
||||
"no-var": "error"
|
||||
},
|
||||
"env": {
|
||||
"es6": true,
|
||||
"node": true,
|
||||
"mocha": true
|
||||
}
|
||||
}
|
||||
1
packages/pg-cursor/.gitignore
vendored
Normal file
1
packages/pg-cursor/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
node_modules
|
||||
15
packages/pg-cursor/.travis.yml
Normal file
15
packages/pg-cursor/.travis.yml
Normal file
@ -0,0 +1,15 @@
|
||||
language: node_js
|
||||
dist: trusty
|
||||
sudo: false
|
||||
node_js:
|
||||
- '8'
|
||||
- '10'
|
||||
- '12'
|
||||
env:
|
||||
- PGUSER=postgres
|
||||
services:
|
||||
- postgresql
|
||||
addons:
|
||||
postgresql: '9.6'
|
||||
before_script:
|
||||
- psql -c 'create database travis;' -U postgres | true
|
||||
15
packages/pg-cursor/Makefile
Normal file
15
packages/pg-cursor/Makefile
Normal file
@ -0,0 +1,15 @@
|
||||
.PHONY: test
|
||||
test:
|
||||
npm test
|
||||
|
||||
.PHONY: patch
|
||||
patch: test
|
||||
npm version patch -m "Bump version"
|
||||
git push origin master --tags
|
||||
npm publish
|
||||
|
||||
.PHONY: minor
|
||||
minor: test
|
||||
npm version minor -m "Bump version"
|
||||
git push origin master --tags
|
||||
npm publish
|
||||
37
packages/pg-cursor/README.md
Normal file
37
packages/pg-cursor/README.md
Normal file
@ -0,0 +1,37 @@
|
||||
node-pg-cursor
|
||||
==============
|
||||
|
||||
Use a PostgreSQL result cursor from node with an easy to use API.
|
||||
|
||||
### install
|
||||
|
||||
```sh
|
||||
$ npm install pg-cursor
|
||||
```
|
||||
___note___: this depends on _either_ `npm install pg` or `npm install pg.js`, but you __must__ be using the pure JavaScript client. This will __not work__ with the native bindings.
|
||||
|
||||
### :star: [Documentation](https://node-postgres.com/api/cursor) :star:
|
||||
|
||||
### license
|
||||
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2013 Brian M. Carlson
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
218
packages/pg-cursor/index.js
Normal file
218
packages/pg-cursor/index.js
Normal file
@ -0,0 +1,218 @@
|
||||
'use strict'
|
||||
const Result = require('pg/lib/result.js')
|
||||
const prepare = require('pg/lib/utils.js').prepareValue
|
||||
const EventEmitter = require('events').EventEmitter
|
||||
const util = require('util')
|
||||
|
||||
let nextUniqueID = 1 // concept borrowed from org.postgresql.core.v3.QueryExecutorImpl
|
||||
|
||||
function Cursor(text, values, config) {
|
||||
EventEmitter.call(this)
|
||||
|
||||
this._conf = config || {}
|
||||
this.text = text
|
||||
this.values = values ? values.map(prepare) : null
|
||||
this.connection = null
|
||||
this._queue = []
|
||||
this.state = 'initialized'
|
||||
this._result = new Result(this._conf.rowMode, this._conf.types)
|
||||
this._cb = null
|
||||
this._rows = null
|
||||
this._portal = null
|
||||
this._ifNoData = this._ifNoData.bind(this)
|
||||
this._rowDescription = this._rowDescription.bind(this)
|
||||
}
|
||||
|
||||
util.inherits(Cursor, EventEmitter)
|
||||
|
||||
Cursor.prototype._ifNoData = function() {
|
||||
this.state = 'idle'
|
||||
this._shiftQueue()
|
||||
}
|
||||
|
||||
Cursor.prototype._rowDescription = function() {
|
||||
if (this.connection) {
|
||||
this.connection.removeListener('noData', this._ifNoData)
|
||||
}
|
||||
}
|
||||
|
||||
Cursor.prototype.submit = function(connection) {
|
||||
this.connection = connection
|
||||
this._portal = 'C_' + nextUniqueID++
|
||||
|
||||
const con = connection
|
||||
|
||||
con.parse(
|
||||
{
|
||||
text: this.text,
|
||||
},
|
||||
true
|
||||
)
|
||||
|
||||
con.bind(
|
||||
{
|
||||
portal: this._portal,
|
||||
values: this.values,
|
||||
},
|
||||
true
|
||||
)
|
||||
|
||||
con.describe(
|
||||
{
|
||||
type: 'P',
|
||||
name: this._portal, // AWS Redshift requires a portal name
|
||||
},
|
||||
true
|
||||
)
|
||||
|
||||
con.flush()
|
||||
|
||||
if (this._conf.types) {
|
||||
this._result._getTypeParser = this._conf.types.getTypeParser
|
||||
}
|
||||
|
||||
con.once('noData', this._ifNoData)
|
||||
con.once('rowDescription', this._rowDescription)
|
||||
}
|
||||
|
||||
Cursor.prototype._shiftQueue = function() {
|
||||
if (this._queue.length) {
|
||||
this._getRows.apply(this, this._queue.shift())
|
||||
}
|
||||
}
|
||||
|
||||
Cursor.prototype._closePortal = function() {
|
||||
// because we opened a named portal to stream results
|
||||
// we need to close the same named portal. Leaving a named portal
|
||||
// open can lock tables for modification if inside a transaction.
|
||||
// see https://github.com/brianc/node-pg-cursor/issues/56
|
||||
this.connection.close({ type: 'P', name: this._portal })
|
||||
this.connection.sync()
|
||||
}
|
||||
|
||||
Cursor.prototype.handleRowDescription = function(msg) {
|
||||
this._result.addFields(msg.fields)
|
||||
this.state = 'idle'
|
||||
this._shiftQueue()
|
||||
}
|
||||
|
||||
Cursor.prototype.handleDataRow = function(msg) {
|
||||
const row = this._result.parseRow(msg.fields)
|
||||
this.emit('row', row, this._result)
|
||||
this._rows.push(row)
|
||||
}
|
||||
|
||||
Cursor.prototype._sendRows = function() {
|
||||
this.state = 'idle'
|
||||
setImmediate(() => {
|
||||
const cb = this._cb
|
||||
// remove callback before calling it
|
||||
// because likely a new one will be added
|
||||
// within the call to this callback
|
||||
this._cb = null
|
||||
if (cb) {
|
||||
this._result.rows = this._rows
|
||||
cb(null, this._rows, this._result)
|
||||
}
|
||||
this._rows = []
|
||||
})
|
||||
}
|
||||
|
||||
Cursor.prototype.handleCommandComplete = function(msg) {
|
||||
this._result.addCommandComplete(msg)
|
||||
this._closePortal()
|
||||
}
|
||||
|
||||
Cursor.prototype.handlePortalSuspended = function() {
|
||||
this._sendRows()
|
||||
}
|
||||
|
||||
Cursor.prototype.handleReadyForQuery = function() {
|
||||
this._sendRows()
|
||||
this.state = 'done'
|
||||
this.emit('end', this._result)
|
||||
}
|
||||
|
||||
Cursor.prototype.handleEmptyQuery = function() {
|
||||
this.connection.sync()
|
||||
}
|
||||
|
||||
Cursor.prototype.handleError = function(msg) {
|
||||
this.connection.removeListener('noData', this._ifNoData)
|
||||
this.connection.removeListener('rowDescription', this._rowDescription)
|
||||
this.state = 'error'
|
||||
this._error = msg
|
||||
// satisfy any waiting callback
|
||||
if (this._cb) {
|
||||
this._cb(msg)
|
||||
}
|
||||
// dispatch error to all waiting callbacks
|
||||
for (let i = 0; i < this._queue.length; i++) {
|
||||
this._queue.pop()[1](msg)
|
||||
}
|
||||
|
||||
if (this.listenerCount('error') > 0) {
|
||||
// only dispatch error events if we have a listener
|
||||
this.emit('error', msg)
|
||||
}
|
||||
// call sync to keep this connection from hanging
|
||||
this.connection.sync()
|
||||
}
|
||||
|
||||
Cursor.prototype._getRows = function(rows, cb) {
|
||||
this.state = 'busy'
|
||||
this._cb = cb
|
||||
this._rows = []
|
||||
const msg = {
|
||||
portal: this._portal,
|
||||
rows: rows,
|
||||
}
|
||||
this.connection.execute(msg, true)
|
||||
this.connection.flush()
|
||||
}
|
||||
|
||||
// users really shouldn't be calling 'end' here and terminating a connection to postgres
|
||||
// via the low level connection.end api
|
||||
Cursor.prototype.end = util.deprecate(function(cb) {
|
||||
if (this.state !== 'initialized') {
|
||||
this.connection.sync()
|
||||
}
|
||||
this.connection.once('end', cb)
|
||||
this.connection.end()
|
||||
}, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.')
|
||||
|
||||
Cursor.prototype.close = function(cb) {
|
||||
if (this.state === 'done') {
|
||||
if (cb) {
|
||||
return setImmediate(cb)
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
this._closePortal()
|
||||
this.state = 'done'
|
||||
if (cb) {
|
||||
this.connection.once('closeComplete', function() {
|
||||
cb()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Cursor.prototype.read = function(rows, cb) {
|
||||
if (this.state === 'idle') {
|
||||
return this._getRows(rows, cb)
|
||||
}
|
||||
if (this.state === 'busy' || this.state === 'initialized') {
|
||||
return this._queue.push([rows, cb])
|
||||
}
|
||||
if (this.state === 'error') {
|
||||
return setImmediate(() => cb(this._error))
|
||||
}
|
||||
if (this.state === 'done') {
|
||||
return setImmediate(() => cb(null, []))
|
||||
} else {
|
||||
throw new Error('Unknown state: ' + this.state)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Cursor
|
||||
32
packages/pg-cursor/package.json
Normal file
32
packages/pg-cursor/package.json
Normal file
@ -0,0 +1,32 @@
|
||||
{
|
||||
"name": "pg-cursor",
|
||||
"version": "2.0.1",
|
||||
"description": "Query cursor extension for node-postgres",
|
||||
"main": "index.js",
|
||||
"directories": {
|
||||
"test": "test"
|
||||
},
|
||||
"scripts": {
|
||||
"test": " mocha && eslint ."
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "git://github.com/brianc/node-pg-cursor.git"
|
||||
},
|
||||
"author": "Brian M. Carlson",
|
||||
"license": "MIT",
|
||||
"devDependencies": {
|
||||
"eslint": "^6.5.1",
|
||||
"eslint-config-prettier": "^6.4.0",
|
||||
"eslint-plugin-prettier": "^3.1.1",
|
||||
"mocha": "^6.2.2",
|
||||
"pg": "7.x",
|
||||
"prettier": "^1.18.2"
|
||||
},
|
||||
"prettier": {
|
||||
"semi": false,
|
||||
"printWidth": 120,
|
||||
"trailingComma": "es5",
|
||||
"singleQuote": true
|
||||
}
|
||||
}
|
||||
45
packages/pg-cursor/test/close.js
Normal file
45
packages/pg-cursor/test/close.js
Normal file
@ -0,0 +1,45 @@
|
||||
const assert = require('assert')
|
||||
const Cursor = require('../')
|
||||
const pg = require('pg')
|
||||
|
||||
const text = 'SELECT generate_series as num FROM generate_series(0, 50)'
|
||||
describe('close', function() {
|
||||
beforeEach(function(done) {
|
||||
const client = (this.client = new pg.Client())
|
||||
client.connect(done)
|
||||
client.on('drain', client.end.bind(client))
|
||||
})
|
||||
|
||||
it('can close a finished cursor without a callback', function(done) {
|
||||
const cursor = new Cursor(text)
|
||||
this.client.query(cursor)
|
||||
this.client.query('SELECT NOW()', done)
|
||||
cursor.read(100, function(err) {
|
||||
assert.ifError(err)
|
||||
cursor.close()
|
||||
})
|
||||
})
|
||||
|
||||
it('closes cursor early', function(done) {
|
||||
const cursor = new Cursor(text)
|
||||
this.client.query(cursor)
|
||||
this.client.query('SELECT NOW()', done)
|
||||
cursor.read(25, function(err) {
|
||||
assert.ifError(err)
|
||||
cursor.close()
|
||||
})
|
||||
})
|
||||
|
||||
it('works with callback style', function(done) {
|
||||
const cursor = new Cursor(text)
|
||||
const client = this.client
|
||||
client.query(cursor)
|
||||
cursor.read(25, function(err) {
|
||||
assert.ifError(err)
|
||||
cursor.close(function(err) {
|
||||
assert.ifError(err)
|
||||
client.query('SELECT NOW()', done)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
86
packages/pg-cursor/test/error-handling.js
Normal file
86
packages/pg-cursor/test/error-handling.js
Normal file
@ -0,0 +1,86 @@
|
||||
'use strict'
|
||||
const assert = require('assert')
|
||||
const Cursor = require('../')
|
||||
const pg = require('pg')
|
||||
|
||||
const text = 'SELECT generate_series as num FROM generate_series(0, 4)'
|
||||
|
||||
describe('error handling', function() {
|
||||
it('can continue after error', function(done) {
|
||||
const client = new pg.Client()
|
||||
client.connect()
|
||||
const cursor = client.query(new Cursor('asdfdffsdf'))
|
||||
cursor.read(1, function(err) {
|
||||
assert(err)
|
||||
client.query('SELECT NOW()', function(err) {
|
||||
assert.ifError(err)
|
||||
client.end()
|
||||
done()
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('read callback does not fire sync', () => {
|
||||
it('does not fire error callback sync', done => {
|
||||
const client = new pg.Client()
|
||||
client.connect()
|
||||
const cursor = client.query(new Cursor('asdfdffsdf'))
|
||||
let after = false
|
||||
cursor.read(1, function(err) {
|
||||
assert(err, 'error should be returned')
|
||||
assert.equal(after, true, 'should not call read sync')
|
||||
after = false
|
||||
cursor.read(1, function(err) {
|
||||
assert(err, 'error should be returned')
|
||||
assert.equal(after, true, 'should not call read sync')
|
||||
client.end()
|
||||
done()
|
||||
})
|
||||
after = true
|
||||
})
|
||||
after = true
|
||||
})
|
||||
|
||||
it('does not fire result sync after finished', done => {
|
||||
const client = new pg.Client()
|
||||
client.connect()
|
||||
const cursor = client.query(new Cursor('SELECT NOW()'))
|
||||
let after = false
|
||||
cursor.read(1, function(err) {
|
||||
assert(!err)
|
||||
assert.equal(after, true, 'should not call read sync')
|
||||
cursor.read(1, function(err) {
|
||||
assert(!err)
|
||||
after = false
|
||||
cursor.read(1, function(err) {
|
||||
assert(!err)
|
||||
assert.equal(after, true, 'should not call read sync')
|
||||
client.end()
|
||||
done()
|
||||
})
|
||||
after = true
|
||||
})
|
||||
})
|
||||
after = true
|
||||
})
|
||||
})
|
||||
|
||||
describe('proper cleanup', function() {
|
||||
it('can issue multiple cursors on one client', function(done) {
|
||||
const client = new pg.Client()
|
||||
client.connect()
|
||||
const cursor1 = client.query(new Cursor(text))
|
||||
cursor1.read(8, function(err, rows) {
|
||||
assert.ifError(err)
|
||||
assert.equal(rows.length, 5)
|
||||
const cursor2 = client.query(new Cursor(text))
|
||||
cursor2.read(8, function(err, rows) {
|
||||
assert.ifError(err)
|
||||
assert.equal(rows.length, 5)
|
||||
client.end()
|
||||
done()
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
178
packages/pg-cursor/test/index.js
Normal file
178
packages/pg-cursor/test/index.js
Normal file
@ -0,0 +1,178 @@
|
||||
const assert = require('assert')
|
||||
const Cursor = require('../')
|
||||
const pg = require('pg')
|
||||
|
||||
const text = 'SELECT generate_series as num FROM generate_series(0, 5)'
|
||||
|
||||
describe('cursor', function() {
|
||||
beforeEach(function(done) {
|
||||
const client = (this.client = new pg.Client())
|
||||
client.connect(done)
|
||||
|
||||
this.pgCursor = function(text, values) {
|
||||
return client.query(new Cursor(text, values || []))
|
||||
}
|
||||
})
|
||||
|
||||
afterEach(function() {
|
||||
this.client.end()
|
||||
})
|
||||
|
||||
it('fetch 6 when asking for 10', function(done) {
|
||||
const cursor = this.pgCursor(text)
|
||||
cursor.read(10, function(err, res) {
|
||||
assert.ifError(err)
|
||||
assert.equal(res.length, 6)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('end before reading to end', function(done) {
|
||||
const cursor = this.pgCursor(text)
|
||||
cursor.read(3, function(err, res) {
|
||||
assert.ifError(err)
|
||||
assert.equal(res.length, 3)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('callback with error', function(done) {
|
||||
const cursor = this.pgCursor('select asdfasdf')
|
||||
cursor.read(1, function(err) {
|
||||
assert(err)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('read a partial chunk of data', function(done) {
|
||||
const cursor = this.pgCursor(text)
|
||||
cursor.read(2, function(err, res) {
|
||||
assert.ifError(err)
|
||||
assert.equal(res.length, 2)
|
||||
cursor.read(3, function(err, res) {
|
||||
assert(!err)
|
||||
assert.equal(res.length, 3)
|
||||
cursor.read(1, function(err, res) {
|
||||
assert(!err)
|
||||
assert.equal(res.length, 1)
|
||||
cursor.read(1, function(err, res) {
|
||||
assert(!err)
|
||||
assert.ifError(err)
|
||||
assert.strictEqual(res.length, 0)
|
||||
done()
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
it('read return length 0 past the end', function(done) {
|
||||
const cursor = this.pgCursor(text)
|
||||
cursor.read(2, function(err) {
|
||||
assert(!err)
|
||||
cursor.read(100, function(err, res) {
|
||||
assert(!err)
|
||||
assert.equal(res.length, 4)
|
||||
cursor.read(100, function(err, res) {
|
||||
assert(!err)
|
||||
assert.equal(res.length, 0)
|
||||
done()
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
it('read huge result', function(done) {
|
||||
this.timeout(10000)
|
||||
const text = 'SELECT generate_series as num FROM generate_series(0, 100000)'
|
||||
const values = []
|
||||
const cursor = this.pgCursor(text, values)
|
||||
let count = 0
|
||||
const read = function() {
|
||||
cursor.read(100, function(err, rows) {
|
||||
if (err) return done(err)
|
||||
if (!rows.length) {
|
||||
assert.equal(count, 100001)
|
||||
return done()
|
||||
}
|
||||
count += rows.length
|
||||
if (count % 10000 === 0) {
|
||||
// console.log(count)
|
||||
}
|
||||
setImmediate(read)
|
||||
})
|
||||
}
|
||||
read()
|
||||
})
|
||||
|
||||
it('normalizes parameter values', function(done) {
|
||||
const text = 'SELECT $1::json me'
|
||||
const values = [{ name: 'brian' }]
|
||||
const cursor = this.pgCursor(text, values)
|
||||
cursor.read(1, function(err, rows) {
|
||||
if (err) return done(err)
|
||||
assert.equal(rows[0].me.name, 'brian')
|
||||
cursor.read(1, function(err, rows) {
|
||||
assert(!err)
|
||||
assert.equal(rows.length, 0)
|
||||
done()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
it('returns result along with rows', function(done) {
|
||||
const cursor = this.pgCursor(text)
|
||||
cursor.read(1, function(err, rows, result) {
|
||||
assert.ifError(err)
|
||||
assert.equal(rows.length, 1)
|
||||
assert.strictEqual(rows, result.rows)
|
||||
assert.deepEqual(result.fields.map(f => f.name), ['num'])
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('emits row events', function(done) {
|
||||
const cursor = this.pgCursor(text)
|
||||
cursor.read(10)
|
||||
cursor.on('row', (row, result) => result.addRow(row))
|
||||
cursor.on('end', result => {
|
||||
assert.equal(result.rows.length, 6)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('emits row events when cursor is closed manually', function(done) {
|
||||
const cursor = this.pgCursor(text)
|
||||
cursor.on('row', (row, result) => result.addRow(row))
|
||||
cursor.on('end', result => {
|
||||
assert.equal(result.rows.length, 3)
|
||||
done()
|
||||
})
|
||||
|
||||
cursor.read(3, () => cursor.close())
|
||||
})
|
||||
|
||||
it('emits error events', function(done) {
|
||||
const cursor = this.pgCursor('select asdfasdf')
|
||||
cursor.on('error', function(err) {
|
||||
assert(err)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('returns rowCount on insert', function(done) {
|
||||
const pgCursor = this.pgCursor
|
||||
this.client
|
||||
.query('CREATE TEMPORARY TABLE pg_cursor_test (foo VARCHAR(1), bar VARCHAR(1))')
|
||||
.then(function() {
|
||||
const cursor = pgCursor('insert into pg_cursor_test values($1, $2)', ['a', 'b'])
|
||||
cursor.read(1, function(err, rows, result) {
|
||||
assert.ifError(err)
|
||||
assert.equal(rows.length, 0)
|
||||
assert.equal(result.rowCount, 1)
|
||||
done()
|
||||
})
|
||||
})
|
||||
.catch(done)
|
||||
})
|
||||
})
|
||||
3
packages/pg-cursor/test/mocha.opts
Normal file
3
packages/pg-cursor/test/mocha.opts
Normal file
@ -0,0 +1,3 @@
|
||||
--reporter spec
|
||||
--no-exit
|
||||
--bail
|
||||
34
packages/pg-cursor/test/no-data-handling.js
Normal file
34
packages/pg-cursor/test/no-data-handling.js
Normal file
@ -0,0 +1,34 @@
|
||||
const assert = require('assert')
|
||||
const pg = require('pg')
|
||||
const Cursor = require('../')
|
||||
|
||||
describe('queries with no data', function() {
|
||||
beforeEach(function(done) {
|
||||
const client = (this.client = new pg.Client())
|
||||
client.connect(done)
|
||||
})
|
||||
|
||||
afterEach(function() {
|
||||
this.client.end()
|
||||
})
|
||||
|
||||
it('handles queries that return no data', function(done) {
|
||||
const cursor = new Cursor('CREATE TEMPORARY TABLE whatwhat (thing int)')
|
||||
this.client.query(cursor)
|
||||
cursor.read(100, function(err, rows) {
|
||||
assert.ifError(err)
|
||||
assert.equal(rows.length, 0)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('handles empty query', function(done) {
|
||||
let cursor = new Cursor('-- this is a comment')
|
||||
cursor = this.client.query(cursor)
|
||||
cursor.read(100, function(err, rows) {
|
||||
assert.ifError(err)
|
||||
assert.equal(rows.length, 0)
|
||||
done()
|
||||
})
|
||||
})
|
||||
})
|
||||
107
packages/pg-cursor/test/pool.js
Normal file
107
packages/pg-cursor/test/pool.js
Normal file
@ -0,0 +1,107 @@
|
||||
'use strict'
|
||||
const assert = require('assert')
|
||||
const Cursor = require('../')
|
||||
const pg = require('pg')
|
||||
|
||||
const text = 'SELECT generate_series as num FROM generate_series(0, 50)'
|
||||
|
||||
function poolQueryPromise(pool, readRowCount) {
|
||||
return new Promise((resolve, reject) => {
|
||||
pool.connect((err, client, done) => {
|
||||
if (err) {
|
||||
done(err)
|
||||
return reject(err)
|
||||
}
|
||||
const cursor = client.query(new Cursor(text))
|
||||
cursor.read(readRowCount, err => {
|
||||
if (err) {
|
||||
done(err)
|
||||
return reject(err)
|
||||
}
|
||||
cursor.close(err => {
|
||||
if (err) {
|
||||
done(err)
|
||||
return reject(err)
|
||||
}
|
||||
done()
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
describe('pool', function() {
|
||||
beforeEach(function() {
|
||||
this.pool = new pg.Pool({ max: 1 })
|
||||
})
|
||||
|
||||
afterEach(function() {
|
||||
this.pool.end()
|
||||
})
|
||||
|
||||
it('closes cursor early, single pool query', function(done) {
|
||||
poolQueryPromise(this.pool, 25)
|
||||
.then(() => done())
|
||||
.catch(err => {
|
||||
assert.ifError(err)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('closes cursor early, saturated pool', function(done) {
|
||||
const promises = []
|
||||
for (let i = 0; i < 10; i++) {
|
||||
promises.push(poolQueryPromise(this.pool, 25))
|
||||
}
|
||||
Promise.all(promises)
|
||||
.then(() => done())
|
||||
.catch(err => {
|
||||
assert.ifError(err)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('closes exhausted cursor, single pool query', function(done) {
|
||||
poolQueryPromise(this.pool, 100)
|
||||
.then(() => done())
|
||||
.catch(err => {
|
||||
assert.ifError(err)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('closes exhausted cursor, saturated pool', function(done) {
|
||||
const promises = []
|
||||
for (let i = 0; i < 10; i++) {
|
||||
promises.push(poolQueryPromise(this.pool, 100))
|
||||
}
|
||||
Promise.all(promises)
|
||||
.then(() => done())
|
||||
.catch(err => {
|
||||
assert.ifError(err)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('can close multiple times on a pool', async function() {
|
||||
const pool = new pg.Pool({ max: 1 })
|
||||
const run = async () => {
|
||||
const cursor = new Cursor(text)
|
||||
const client = await pool.connect()
|
||||
client.query(cursor)
|
||||
new Promise(resolve => {
|
||||
cursor.read(25, function(err) {
|
||||
assert.ifError(err)
|
||||
cursor.close(function(err) {
|
||||
assert.ifError(err)
|
||||
client.release()
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
await Promise.all([run(), run(), run()])
|
||||
await pool.end()
|
||||
})
|
||||
})
|
||||
35
packages/pg-cursor/test/query-config.js
Normal file
35
packages/pg-cursor/test/query-config.js
Normal file
@ -0,0 +1,35 @@
|
||||
'use strict'
|
||||
const assert = require('assert')
|
||||
const Cursor = require('../')
|
||||
const pg = require('pg')
|
||||
|
||||
describe('query config passed to result', () => {
|
||||
it('passes rowMode to result', done => {
|
||||
const client = new pg.Client()
|
||||
client.connect()
|
||||
const text = 'SELECT generate_series as num FROM generate_series(0, 5)'
|
||||
const cursor = client.query(new Cursor(text, null, { rowMode: 'array' }))
|
||||
cursor.read(10, (err, rows) => {
|
||||
assert(!err)
|
||||
assert.deepEqual(rows, [[0], [1], [2], [3], [4], [5]])
|
||||
client.end()
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('passes types to result', done => {
|
||||
const client = new pg.Client()
|
||||
client.connect()
|
||||
const text = 'SELECT generate_series as num FROM generate_series(0, 2)'
|
||||
const types = {
|
||||
getTypeParser: () => () => 'foo',
|
||||
}
|
||||
const cursor = client.query(new Cursor(text, null, { types }))
|
||||
cursor.read(10, (err, rows) => {
|
||||
assert(!err)
|
||||
assert.deepEqual(rows, [{ num: 'foo' }, { num: 'foo' }, { num: 'foo' }])
|
||||
client.end()
|
||||
done()
|
||||
})
|
||||
})
|
||||
})
|
||||
43
packages/pg-cursor/test/transactions.js
Normal file
43
packages/pg-cursor/test/transactions.js
Normal file
@ -0,0 +1,43 @@
|
||||
const assert = require('assert')
|
||||
const Cursor = require('../')
|
||||
const pg = require('pg')
|
||||
|
||||
describe('transactions', () => {
|
||||
it('can execute multiple statements in a transaction', async () => {
|
||||
const client = new pg.Client()
|
||||
await client.connect()
|
||||
await client.query('begin')
|
||||
await client.query('CREATE TEMP TABLE foobar(id SERIAL PRIMARY KEY)')
|
||||
const cursor = client.query(new Cursor('SELECT * FROM foobar'))
|
||||
const rows = await new Promise((resolve, reject) => {
|
||||
cursor.read(10, (err, rows) => (err ? reject(err) : resolve(rows)))
|
||||
})
|
||||
assert.equal(rows.length, 0)
|
||||
await client.query('ALTER TABLE foobar ADD COLUMN name TEXT')
|
||||
await client.end()
|
||||
})
|
||||
|
||||
it('can execute multiple statements in a transaction if ending cursor early', async () => {
|
||||
const client = new pg.Client()
|
||||
await client.connect()
|
||||
await client.query('begin')
|
||||
await client.query('CREATE TEMP TABLE foobar(id SERIAL PRIMARY KEY)')
|
||||
const cursor = client.query(new Cursor('SELECT * FROM foobar'))
|
||||
await new Promise(resolve => cursor.close(resolve))
|
||||
await client.query('ALTER TABLE foobar ADD COLUMN name TEXT')
|
||||
await client.end()
|
||||
})
|
||||
|
||||
it('can execute multiple statements in a transaction if no data', async () => {
|
||||
const client = new pg.Client()
|
||||
await client.connect()
|
||||
await client.query('begin')
|
||||
// create a cursor that has no data response
|
||||
const createText = 'CREATE TEMP TABLE foobar(id SERIAL PRIMARY KEY)'
|
||||
const cursor = client.query(new Cursor(createText))
|
||||
const err = await new Promise(resolve => cursor.read(100, resolve))
|
||||
assert.ifError(err)
|
||||
await client.query('ALTER TABLE foobar ADD COLUMN name TEXT')
|
||||
await client.end()
|
||||
})
|
||||
})
|
||||
1298
packages/pg-cursor/yarn.lock
Normal file
1298
packages/pg-cursor/yarn.lock
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user