Auto-fix pg-cursor

This commit is contained in:
Brian M. Carlson 2020-04-10 10:29:54 -05:00
parent a8471aa54b
commit 3002d5cbdd
8 changed files with 536 additions and 536 deletions

View File

@ -1,218 +1,218 @@
'use strict'
const Result = require('pg/lib/result.js')
const prepare = require('pg/lib/utils.js').prepareValue
const EventEmitter = require('events').EventEmitter
const util = require('util')
'use strict';
const Result = require('pg/lib/result.js');
const prepare = require('pg/lib/utils.js').prepareValue;
const EventEmitter = require('events').EventEmitter;
const util = require('util');
let nextUniqueID = 1 // concept borrowed from org.postgresql.core.v3.QueryExecutorImpl
let nextUniqueID = 1; // concept borrowed from org.postgresql.core.v3.QueryExecutorImpl
function Cursor(text, values, config) {
EventEmitter.call(this)
EventEmitter.call(this);
this._conf = config || {}
this.text = text
this.values = values ? values.map(prepare) : null
this.connection = null
this._queue = []
this.state = 'initialized'
this._result = new Result(this._conf.rowMode, this._conf.types)
this._cb = null
this._rows = null
this._portal = null
this._ifNoData = this._ifNoData.bind(this)
this._rowDescription = this._rowDescription.bind(this)
this._conf = config || {};
this.text = text;
this.values = values ? values.map(prepare) : null;
this.connection = null;
this._queue = [];
this.state = 'initialized';
this._result = new Result(this._conf.rowMode, this._conf.types);
this._cb = null;
this._rows = null;
this._portal = null;
this._ifNoData = this._ifNoData.bind(this);
this._rowDescription = this._rowDescription.bind(this);
}
util.inherits(Cursor, EventEmitter)
util.inherits(Cursor, EventEmitter);
Cursor.prototype._ifNoData = function () {
this.state = 'idle'
this._shiftQueue()
}
this.state = 'idle';
this._shiftQueue();
};
Cursor.prototype._rowDescription = function () {
if (this.connection) {
this.connection.removeListener('noData', this._ifNoData)
this.connection.removeListener('noData', this._ifNoData);
}
}
};
Cursor.prototype.submit = function (connection) {
this.connection = connection
this._portal = 'C_' + nextUniqueID++
this.connection = connection;
this._portal = 'C_' + nextUniqueID++;
const con = connection
const con = connection;
con.parse(
{
text: this.text
text: this.text,
},
true
)
);
con.bind(
{
portal: this._portal,
values: this.values
values: this.values,
},
true
)
);
con.describe(
{
type: 'P',
name: this._portal // AWS Redshift requires a portal name
name: this._portal, // AWS Redshift requires a portal name
},
true
)
);
con.flush()
con.flush();
if (this._conf.types) {
this._result._getTypeParser = this._conf.types.getTypeParser
this._result._getTypeParser = this._conf.types.getTypeParser;
}
con.once('noData', this._ifNoData)
con.once('rowDescription', this._rowDescription)
}
con.once('noData', this._ifNoData);
con.once('rowDescription', this._rowDescription);
};
Cursor.prototype._shiftQueue = function () {
if (this._queue.length) {
this._getRows.apply(this, this._queue.shift())
this._getRows.apply(this, this._queue.shift());
}
}
};
Cursor.prototype._closePortal = function () {
// because we opened a named portal to stream results
// we need to close the same named portal. Leaving a named portal
// open can lock tables for modification if inside a transaction.
// see https://github.com/brianc/node-pg-cursor/issues/56
this.connection.close({ type: 'P', name: this._portal })
this.connection.sync()
}
this.connection.close({ type: 'P', name: this._portal });
this.connection.sync();
};
Cursor.prototype.handleRowDescription = function (msg) {
this._result.addFields(msg.fields)
this.state = 'idle'
this._shiftQueue()
}
this._result.addFields(msg.fields);
this.state = 'idle';
this._shiftQueue();
};
Cursor.prototype.handleDataRow = function (msg) {
const row = this._result.parseRow(msg.fields)
this.emit('row', row, this._result)
this._rows.push(row)
}
const row = this._result.parseRow(msg.fields);
this.emit('row', row, this._result);
this._rows.push(row);
};
Cursor.prototype._sendRows = function () {
this.state = 'idle'
this.state = 'idle';
setImmediate(() => {
const cb = this._cb
const cb = this._cb;
// remove callback before calling it
// because likely a new one will be added
// within the call to this callback
this._cb = null
this._cb = null;
if (cb) {
this._result.rows = this._rows
cb(null, this._rows, this._result)
this._result.rows = this._rows;
cb(null, this._rows, this._result);
}
this._rows = []
})
}
this._rows = [];
});
};
Cursor.prototype.handleCommandComplete = function (msg) {
this._result.addCommandComplete(msg)
this._closePortal()
}
this._result.addCommandComplete(msg);
this._closePortal();
};
Cursor.prototype.handlePortalSuspended = function () {
this._sendRows()
}
this._sendRows();
};
Cursor.prototype.handleReadyForQuery = function () {
this._sendRows()
this.state = 'done'
this.emit('end', this._result)
}
this._sendRows();
this.state = 'done';
this.emit('end', this._result);
};
Cursor.prototype.handleEmptyQuery = function () {
this.connection.sync()
}
this.connection.sync();
};
Cursor.prototype.handleError = function (msg) {
this.connection.removeListener('noData', this._ifNoData)
this.connection.removeListener('rowDescription', this._rowDescription)
this.state = 'error'
this._error = msg
this.connection.removeListener('noData', this._ifNoData);
this.connection.removeListener('rowDescription', this._rowDescription);
this.state = 'error';
this._error = msg;
// satisfy any waiting callback
if (this._cb) {
this._cb(msg)
this._cb(msg);
}
// dispatch error to all waiting callbacks
for (let i = 0; i < this._queue.length; i++) {
this._queue.pop()[1](msg)
this._queue.pop()[1](msg);
}
if (this.listenerCount('error') > 0) {
// only dispatch error events if we have a listener
this.emit('error', msg)
this.emit('error', msg);
}
// call sync to keep this connection from hanging
this.connection.sync()
}
this.connection.sync();
};
Cursor.prototype._getRows = function (rows, cb) {
this.state = 'busy'
this._cb = cb
this._rows = []
this.state = 'busy';
this._cb = cb;
this._rows = [];
const msg = {
portal: this._portal,
rows: rows
}
this.connection.execute(msg, true)
this.connection.flush()
}
rows: rows,
};
this.connection.execute(msg, true);
this.connection.flush();
};
// users really shouldn't be calling 'end' here and terminating a connection to postgres
// via the low level connection.end api
Cursor.prototype.end = util.deprecate(function (cb) {
if (this.state !== 'initialized') {
this.connection.sync()
this.connection.sync();
}
this.connection.once('end', cb)
this.connection.end()
}, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.')
this.connection.once('end', cb);
this.connection.end();
}, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.');
Cursor.prototype.close = function (cb) {
if (!this.connection || this.state === 'done') {
if (cb) {
return setImmediate(cb)
return setImmediate(cb);
} else {
return
return;
}
}
this._closePortal()
this.state = 'done'
this._closePortal();
this.state = 'done';
if (cb) {
this.connection.once('readyForQuery', function () {
cb()
})
cb();
});
}
}
};
Cursor.prototype.read = function (rows, cb) {
if (this.state === 'idle') {
return this._getRows(rows, cb)
return this._getRows(rows, cb);
}
if (this.state === 'busy' || this.state === 'initialized') {
return this._queue.push([rows, cb])
return this._queue.push([rows, cb]);
}
if (this.state === 'error') {
return setImmediate(() => cb(this._error))
return setImmediate(() => cb(this._error));
}
if (this.state === 'done') {
return setImmediate(() => cb(null, []))
return setImmediate(() => cb(null, []));
} else {
throw new Error('Unknown state: ' + this.state)
throw new Error('Unknown state: ' + this.state);
}
}
};
module.exports = Cursor
module.exports = Cursor;

View File

@ -1,54 +1,54 @@
const assert = require('assert')
const Cursor = require('../')
const pg = require('pg')
const assert = require('assert');
const Cursor = require('../');
const pg = require('pg');
const text = 'SELECT generate_series as num FROM generate_series(0, 50)'
const text = 'SELECT generate_series as num FROM generate_series(0, 50)';
describe('close', function () {
beforeEach(function (done) {
const client = (this.client = new pg.Client())
client.connect(done)
})
const client = (this.client = new pg.Client());
client.connect(done);
});
this.afterEach(function (done) {
this.client.end(done)
})
this.client.end(done);
});
it('can close a finished cursor without a callback', function (done) {
const cursor = new Cursor(text)
this.client.query(cursor)
this.client.query('SELECT NOW()', done)
const cursor = new Cursor(text);
this.client.query(cursor);
this.client.query('SELECT NOW()', done);
cursor.read(100, function (err) {
assert.ifError(err)
cursor.close()
})
})
assert.ifError(err);
cursor.close();
});
});
it('closes cursor early', function (done) {
const cursor = new Cursor(text)
this.client.query(cursor)
this.client.query('SELECT NOW()', done)
const cursor = new Cursor(text);
this.client.query(cursor);
this.client.query('SELECT NOW()', done);
cursor.read(25, function (err) {
assert.ifError(err)
cursor.close()
})
})
assert.ifError(err);
cursor.close();
});
});
it('works with callback style', function (done) {
const cursor = new Cursor(text)
const client = this.client
client.query(cursor)
const cursor = new Cursor(text);
const client = this.client;
client.query(cursor);
cursor.read(25, function (err, rows) {
assert.ifError(err)
assert.strictEqual(rows.length, 25)
assert.ifError(err);
assert.strictEqual(rows.length, 25);
cursor.close(function (err) {
assert.ifError(err)
client.query('SELECT NOW()', done)
})
})
})
assert.ifError(err);
client.query('SELECT NOW()', done);
});
});
});
it('is a no-op to "close" the cursor before submitting it', function (done) {
const cursor = new Cursor(text)
cursor.close(done)
})
})
const cursor = new Cursor(text);
cursor.close(done);
});
});

View File

@ -1,86 +1,86 @@
'use strict'
const assert = require('assert')
const Cursor = require('../')
const pg = require('pg')
'use strict';
const assert = require('assert');
const Cursor = require('../');
const pg = require('pg');
const text = 'SELECT generate_series as num FROM generate_series(0, 4)'
const text = 'SELECT generate_series as num FROM generate_series(0, 4)';
describe('error handling', function() {
it('can continue after error', function(done) {
const client = new pg.Client()
client.connect()
const cursor = client.query(new Cursor('asdfdffsdf'))
cursor.read(1, function(err) {
assert(err)
client.query('SELECT NOW()', function(err) {
assert.ifError(err)
client.end()
done()
})
})
})
})
describe('error handling', function () {
it('can continue after error', function (done) {
const client = new pg.Client();
client.connect();
const cursor = client.query(new Cursor('asdfdffsdf'));
cursor.read(1, function (err) {
assert(err);
client.query('SELECT NOW()', function (err) {
assert.ifError(err);
client.end();
done();
});
});
});
});
describe('read callback does not fire sync', () => {
it('does not fire error callback sync', done => {
const client = new pg.Client()
client.connect()
const cursor = client.query(new Cursor('asdfdffsdf'))
let after = false
cursor.read(1, function(err) {
assert(err, 'error should be returned')
assert.strictEqual(after, true, 'should not call read sync')
after = false
cursor.read(1, function(err) {
assert(err, 'error should be returned')
assert.strictEqual(after, true, 'should not call read sync')
client.end()
done()
})
after = true
})
after = true
})
it('does not fire error callback sync', (done) => {
const client = new pg.Client();
client.connect();
const cursor = client.query(new Cursor('asdfdffsdf'));
let after = false;
cursor.read(1, function (err) {
assert(err, 'error should be returned');
assert.strictEqual(after, true, 'should not call read sync');
after = false;
cursor.read(1, function (err) {
assert(err, 'error should be returned');
assert.strictEqual(after, true, 'should not call read sync');
client.end();
done();
});
after = true;
});
after = true;
});
it('does not fire result sync after finished', done => {
const client = new pg.Client()
client.connect()
const cursor = client.query(new Cursor('SELECT NOW()'))
let after = false
cursor.read(1, function(err) {
assert(!err)
assert.strictEqual(after, true, 'should not call read sync')
cursor.read(1, function(err) {
assert(!err)
after = false
cursor.read(1, function(err) {
assert(!err)
assert.strictEqual(after, true, 'should not call read sync')
client.end()
done()
})
after = true
})
})
after = true
})
})
it('does not fire result sync after finished', (done) => {
const client = new pg.Client();
client.connect();
const cursor = client.query(new Cursor('SELECT NOW()'));
let after = false;
cursor.read(1, function (err) {
assert(!err);
assert.strictEqual(after, true, 'should not call read sync');
cursor.read(1, function (err) {
assert(!err);
after = false;
cursor.read(1, function (err) {
assert(!err);
assert.strictEqual(after, true, 'should not call read sync');
client.end();
done();
});
after = true;
});
});
after = true;
});
});
describe('proper cleanup', function() {
it('can issue multiple cursors on one client', function(done) {
const client = new pg.Client()
client.connect()
const cursor1 = client.query(new Cursor(text))
cursor1.read(8, function(err, rows) {
assert.ifError(err)
assert.strictEqual(rows.length, 5)
const cursor2 = client.query(new Cursor(text))
cursor2.read(8, function(err, rows) {
assert.ifError(err)
assert.strictEqual(rows.length, 5)
client.end()
done()
})
})
})
})
describe('proper cleanup', function () {
it('can issue multiple cursors on one client', function (done) {
const client = new pg.Client();
client.connect();
const cursor1 = client.query(new Cursor(text));
cursor1.read(8, function (err, rows) {
assert.ifError(err);
assert.strictEqual(rows.length, 5);
const cursor2 = client.query(new Cursor(text));
cursor2.read(8, function (err, rows) {
assert.ifError(err);
assert.strictEqual(rows.length, 5);
client.end();
done();
});
});
});
});

View File

@ -1,181 +1,181 @@
const assert = require('assert')
const Cursor = require('../')
const pg = require('pg')
const assert = require('assert');
const Cursor = require('../');
const pg = require('pg');
const text = 'SELECT generate_series as num FROM generate_series(0, 5)'
const text = 'SELECT generate_series as num FROM generate_series(0, 5)';
describe('cursor', function() {
beforeEach(function(done) {
const client = (this.client = new pg.Client())
client.connect(done)
describe('cursor', function () {
beforeEach(function (done) {
const client = (this.client = new pg.Client());
client.connect(done);
this.pgCursor = function(text, values) {
return client.query(new Cursor(text, values || []))
}
})
this.pgCursor = function (text, values) {
return client.query(new Cursor(text, values || []));
};
});
afterEach(function() {
this.client.end()
})
afterEach(function () {
this.client.end();
});
it('fetch 6 when asking for 10', function(done) {
const cursor = this.pgCursor(text)
cursor.read(10, function(err, res) {
assert.ifError(err)
assert.strictEqual(res.length, 6)
done()
})
})
it('fetch 6 when asking for 10', function (done) {
const cursor = this.pgCursor(text);
cursor.read(10, function (err, res) {
assert.ifError(err);
assert.strictEqual(res.length, 6);
done();
});
});
it('end before reading to end', function(done) {
const cursor = this.pgCursor(text)
cursor.read(3, function(err, res) {
assert.ifError(err)
assert.strictEqual(res.length, 3)
done()
})
})
it('end before reading to end', function (done) {
const cursor = this.pgCursor(text);
cursor.read(3, function (err, res) {
assert.ifError(err);
assert.strictEqual(res.length, 3);
done();
});
});
it('callback with error', function(done) {
const cursor = this.pgCursor('select asdfasdf')
cursor.read(1, function(err) {
assert(err)
done()
})
})
it('callback with error', function (done) {
const cursor = this.pgCursor('select asdfasdf');
cursor.read(1, function (err) {
assert(err);
done();
});
});
it('read a partial chunk of data', function(done) {
const cursor = this.pgCursor(text)
cursor.read(2, function(err, res) {
assert.ifError(err)
assert.strictEqual(res.length, 2)
cursor.read(3, function(err, res) {
assert(!err)
assert.strictEqual(res.length, 3)
cursor.read(1, function(err, res) {
assert(!err)
assert.strictEqual(res.length, 1)
cursor.read(1, function(err, res) {
assert(!err)
assert.ifError(err)
assert.strictEqual(res.length, 0)
done()
})
})
})
})
})
it('read a partial chunk of data', function (done) {
const cursor = this.pgCursor(text);
cursor.read(2, function (err, res) {
assert.ifError(err);
assert.strictEqual(res.length, 2);
cursor.read(3, function (err, res) {
assert(!err);
assert.strictEqual(res.length, 3);
cursor.read(1, function (err, res) {
assert(!err);
assert.strictEqual(res.length, 1);
cursor.read(1, function (err, res) {
assert(!err);
assert.ifError(err);
assert.strictEqual(res.length, 0);
done();
});
});
});
});
});
it('read return length 0 past the end', function(done) {
const cursor = this.pgCursor(text)
cursor.read(2, function(err) {
assert(!err)
cursor.read(100, function(err, res) {
assert(!err)
assert.strictEqual(res.length, 4)
cursor.read(100, function(err, res) {
assert(!err)
assert.strictEqual(res.length, 0)
done()
})
})
})
})
it('read return length 0 past the end', function (done) {
const cursor = this.pgCursor(text);
cursor.read(2, function (err) {
assert(!err);
cursor.read(100, function (err, res) {
assert(!err);
assert.strictEqual(res.length, 4);
cursor.read(100, function (err, res) {
assert(!err);
assert.strictEqual(res.length, 0);
done();
});
});
});
});
it('read huge result', function(done) {
this.timeout(10000)
const text = 'SELECT generate_series as num FROM generate_series(0, 100000)'
const values = []
const cursor = this.pgCursor(text, values)
let count = 0
const read = function() {
cursor.read(100, function(err, rows) {
if (err) return done(err)
it('read huge result', function (done) {
this.timeout(10000);
const text = 'SELECT generate_series as num FROM generate_series(0, 100000)';
const values = [];
const cursor = this.pgCursor(text, values);
let count = 0;
const read = function () {
cursor.read(100, function (err, rows) {
if (err) return done(err);
if (!rows.length) {
assert.strictEqual(count, 100001)
return done()
assert.strictEqual(count, 100001);
return done();
}
count += rows.length
count += rows.length;
if (count % 10000 === 0) {
// console.log(count)
}
setImmediate(read)
})
}
read()
})
setImmediate(read);
});
};
read();
});
it('normalizes parameter values', function(done) {
const text = 'SELECT $1::json me'
const values = [{ name: 'brian' }]
const cursor = this.pgCursor(text, values)
cursor.read(1, function(err, rows) {
if (err) return done(err)
assert.strictEqual(rows[0].me.name, 'brian')
cursor.read(1, function(err, rows) {
assert(!err)
assert.strictEqual(rows.length, 0)
done()
})
})
})
it('normalizes parameter values', function (done) {
const text = 'SELECT $1::json me';
const values = [{ name: 'brian' }];
const cursor = this.pgCursor(text, values);
cursor.read(1, function (err, rows) {
if (err) return done(err);
assert.strictEqual(rows[0].me.name, 'brian');
cursor.read(1, function (err, rows) {
assert(!err);
assert.strictEqual(rows.length, 0);
done();
});
});
});
it('returns result along with rows', function(done) {
const cursor = this.pgCursor(text)
cursor.read(1, function(err, rows, result) {
assert.ifError(err)
assert.strictEqual(rows.length, 1)
assert.strictEqual(rows, result.rows)
it('returns result along with rows', function (done) {
const cursor = this.pgCursor(text);
cursor.read(1, function (err, rows, result) {
assert.ifError(err);
assert.strictEqual(rows.length, 1);
assert.strictEqual(rows, result.rows);
assert.deepStrictEqual(
result.fields.map(f => f.name),
result.fields.map((f) => f.name),
['num']
)
done()
})
})
);
done();
});
});
it('emits row events', function(done) {
const cursor = this.pgCursor(text)
cursor.read(10)
cursor.on('row', (row, result) => result.addRow(row))
cursor.on('end', result => {
assert.strictEqual(result.rows.length, 6)
done()
})
})
it('emits row events', function (done) {
const cursor = this.pgCursor(text);
cursor.read(10);
cursor.on('row', (row, result) => result.addRow(row));
cursor.on('end', (result) => {
assert.strictEqual(result.rows.length, 6);
done();
});
});
it('emits row events when cursor is closed manually', function(done) {
const cursor = this.pgCursor(text)
cursor.on('row', (row, result) => result.addRow(row))
cursor.on('end', result => {
assert.strictEqual(result.rows.length, 3)
done()
})
it('emits row events when cursor is closed manually', function (done) {
const cursor = this.pgCursor(text);
cursor.on('row', (row, result) => result.addRow(row));
cursor.on('end', (result) => {
assert.strictEqual(result.rows.length, 3);
done();
});
cursor.read(3, () => cursor.close())
})
cursor.read(3, () => cursor.close());
});
it('emits error events', function(done) {
const cursor = this.pgCursor('select asdfasdf')
cursor.on('error', function(err) {
assert(err)
done()
})
})
it('emits error events', function (done) {
const cursor = this.pgCursor('select asdfasdf');
cursor.on('error', function (err) {
assert(err);
done();
});
});
it('returns rowCount on insert', function(done) {
const pgCursor = this.pgCursor
it('returns rowCount on insert', function (done) {
const pgCursor = this.pgCursor;
this.client
.query('CREATE TEMPORARY TABLE pg_cursor_test (foo VARCHAR(1), bar VARCHAR(1))')
.then(function() {
const cursor = pgCursor('insert into pg_cursor_test values($1, $2)', ['a', 'b'])
cursor.read(1, function(err, rows, result) {
assert.ifError(err)
assert.strictEqual(rows.length, 0)
assert.strictEqual(result.rowCount, 1)
done()
})
.then(function () {
const cursor = pgCursor('insert into pg_cursor_test values($1, $2)', ['a', 'b']);
cursor.read(1, function (err, rows, result) {
assert.ifError(err);
assert.strictEqual(rows.length, 0);
assert.strictEqual(result.rowCount, 1);
done();
});
})
.catch(done)
})
})
.catch(done);
});
});

View File

@ -1,34 +1,34 @@
const assert = require('assert')
const pg = require('pg')
const Cursor = require('../')
const assert = require('assert');
const pg = require('pg');
const Cursor = require('../');
describe('queries with no data', function() {
beforeEach(function(done) {
const client = (this.client = new pg.Client())
client.connect(done)
})
describe('queries with no data', function () {
beforeEach(function (done) {
const client = (this.client = new pg.Client());
client.connect(done);
});
afterEach(function() {
this.client.end()
})
afterEach(function () {
this.client.end();
});
it('handles queries that return no data', function(done) {
const cursor = new Cursor('CREATE TEMPORARY TABLE whatwhat (thing int)')
this.client.query(cursor)
cursor.read(100, function(err, rows) {
assert.ifError(err)
assert.strictEqual(rows.length, 0)
done()
})
})
it('handles queries that return no data', function (done) {
const cursor = new Cursor('CREATE TEMPORARY TABLE whatwhat (thing int)');
this.client.query(cursor);
cursor.read(100, function (err, rows) {
assert.ifError(err);
assert.strictEqual(rows.length, 0);
done();
});
});
it('handles empty query', function(done) {
let cursor = new Cursor('-- this is a comment')
cursor = this.client.query(cursor)
cursor.read(100, function(err, rows) {
assert.ifError(err)
assert.strictEqual(rows.length, 0)
done()
})
})
})
it('handles empty query', function (done) {
let cursor = new Cursor('-- this is a comment');
cursor = this.client.query(cursor);
cursor.read(100, function (err, rows) {
assert.ifError(err);
assert.strictEqual(rows.length, 0);
done();
});
});
});

View File

@ -1,107 +1,107 @@
'use strict'
const assert = require('assert')
const Cursor = require('../')
const pg = require('pg')
'use strict';
const assert = require('assert');
const Cursor = require('../');
const pg = require('pg');
const text = 'SELECT generate_series as num FROM generate_series(0, 50)'
const text = 'SELECT generate_series as num FROM generate_series(0, 50)';
function poolQueryPromise (pool, readRowCount) {
function poolQueryPromise(pool, readRowCount) {
return new Promise((resolve, reject) => {
pool.connect((err, client, done) => {
if (err) {
done(err)
return reject(err)
done(err);
return reject(err);
}
const cursor = client.query(new Cursor(text))
cursor.read(readRowCount, err => {
const cursor = client.query(new Cursor(text));
cursor.read(readRowCount, (err) => {
if (err) {
done(err)
return reject(err)
done(err);
return reject(err);
}
cursor.close(err => {
cursor.close((err) => {
if (err) {
done(err)
return reject(err)
done(err);
return reject(err);
}
done()
resolve()
})
})
})
})
done();
resolve();
});
});
});
});
}
describe('pool', function () {
beforeEach(function () {
this.pool = new pg.Pool({ max: 1 })
})
this.pool = new pg.Pool({ max: 1 });
});
afterEach(function () {
this.pool.end()
})
this.pool.end();
});
it('closes cursor early, single pool query', function (done) {
poolQueryPromise(this.pool, 25)
.then(() => done())
.catch(err => {
assert.ifError(err)
done()
})
})
.catch((err) => {
assert.ifError(err);
done();
});
});
it('closes cursor early, saturated pool', function (done) {
const promises = []
const promises = [];
for (let i = 0; i < 10; i++) {
promises.push(poolQueryPromise(this.pool, 25))
promises.push(poolQueryPromise(this.pool, 25));
}
Promise.all(promises)
.then(() => done())
.catch(err => {
assert.ifError(err)
done()
})
})
.catch((err) => {
assert.ifError(err);
done();
});
});
it('closes exhausted cursor, single pool query', function (done) {
poolQueryPromise(this.pool, 100)
.then(() => done())
.catch(err => {
assert.ifError(err)
done()
})
})
.catch((err) => {
assert.ifError(err);
done();
});
});
it('closes exhausted cursor, saturated pool', function (done) {
const promises = []
const promises = [];
for (let i = 0; i < 10; i++) {
promises.push(poolQueryPromise(this.pool, 100))
promises.push(poolQueryPromise(this.pool, 100));
}
Promise.all(promises)
.then(() => done())
.catch(err => {
assert.ifError(err)
done()
})
})
.catch((err) => {
assert.ifError(err);
done();
});
});
it('can close multiple times on a pool', async function () {
const pool = new pg.Pool({ max: 1 })
const pool = new pg.Pool({ max: 1 });
const run = async () => {
const cursor = new Cursor(text)
const client = await pool.connect()
client.query(cursor)
await new Promise(resolve => {
const cursor = new Cursor(text);
const client = await pool.connect();
client.query(cursor);
await new Promise((resolve) => {
cursor.read(25, function (err) {
assert.ifError(err)
assert.ifError(err);
cursor.close(function (err) {
assert.ifError(err)
client.release()
resolve()
})
})
})
}
await Promise.all([run(), run(), run()])
await pool.end()
})
})
assert.ifError(err);
client.release();
resolve();
});
});
});
};
await Promise.all([run(), run(), run()]);
await pool.end();
});
});

View File

@ -1,35 +1,35 @@
'use strict'
const assert = require('assert')
const Cursor = require('../')
const pg = require('pg')
'use strict';
const assert = require('assert');
const Cursor = require('../');
const pg = require('pg');
describe('query config passed to result', () => {
it('passes rowMode to result', done => {
const client = new pg.Client()
client.connect()
const text = 'SELECT generate_series as num FROM generate_series(0, 5)'
const cursor = client.query(new Cursor(text, null, { rowMode: 'array' }))
it('passes rowMode to result', (done) => {
const client = new pg.Client();
client.connect();
const text = 'SELECT generate_series as num FROM generate_series(0, 5)';
const cursor = client.query(new Cursor(text, null, { rowMode: 'array' }));
cursor.read(10, (err, rows) => {
assert(!err)
assert.deepStrictEqual(rows, [[0], [1], [2], [3], [4], [5]])
client.end()
done()
})
})
assert(!err);
assert.deepStrictEqual(rows, [[0], [1], [2], [3], [4], [5]]);
client.end();
done();
});
});
it('passes types to result', done => {
const client = new pg.Client()
client.connect()
const text = 'SELECT generate_series as num FROM generate_series(0, 2)'
it('passes types to result', (done) => {
const client = new pg.Client();
client.connect();
const text = 'SELECT generate_series as num FROM generate_series(0, 2)';
const types = {
getTypeParser: () => () => 'foo'
}
const cursor = client.query(new Cursor(text, null, { types }))
getTypeParser: () => () => 'foo',
};
const cursor = client.query(new Cursor(text, null, { types }));
cursor.read(10, (err, rows) => {
assert(!err)
assert.deepStrictEqual(rows, [{ num: 'foo' }, { num: 'foo' }, { num: 'foo' }])
client.end()
done()
})
})
})
assert(!err);
assert.deepStrictEqual(rows, [{ num: 'foo' }, { num: 'foo' }, { num: 'foo' }]);
client.end();
done();
});
});
});

View File

@ -1,43 +1,43 @@
const assert = require('assert')
const Cursor = require('../')
const pg = require('pg')
const assert = require('assert');
const Cursor = require('../');
const pg = require('pg');
describe('transactions', () => {
it('can execute multiple statements in a transaction', async () => {
const client = new pg.Client()
await client.connect()
await client.query('begin')
await client.query('CREATE TEMP TABLE foobar(id SERIAL PRIMARY KEY)')
const cursor = client.query(new Cursor('SELECT * FROM foobar'))
const client = new pg.Client();
await client.connect();
await client.query('begin');
await client.query('CREATE TEMP TABLE foobar(id SERIAL PRIMARY KEY)');
const cursor = client.query(new Cursor('SELECT * FROM foobar'));
const rows = await new Promise((resolve, reject) => {
cursor.read(10, (err, rows) => (err ? reject(err) : resolve(rows)))
})
assert.strictEqual(rows.length, 0)
await client.query('ALTER TABLE foobar ADD COLUMN name TEXT')
await client.end()
})
cursor.read(10, (err, rows) => (err ? reject(err) : resolve(rows)));
});
assert.strictEqual(rows.length, 0);
await client.query('ALTER TABLE foobar ADD COLUMN name TEXT');
await client.end();
});
it('can execute multiple statements in a transaction if ending cursor early', async () => {
const client = new pg.Client()
await client.connect()
await client.query('begin')
await client.query('CREATE TEMP TABLE foobar(id SERIAL PRIMARY KEY)')
const cursor = client.query(new Cursor('SELECT * FROM foobar'))
await new Promise(resolve => cursor.close(resolve))
await client.query('ALTER TABLE foobar ADD COLUMN name TEXT')
await client.end()
})
const client = new pg.Client();
await client.connect();
await client.query('begin');
await client.query('CREATE TEMP TABLE foobar(id SERIAL PRIMARY KEY)');
const cursor = client.query(new Cursor('SELECT * FROM foobar'));
await new Promise((resolve) => cursor.close(resolve));
await client.query('ALTER TABLE foobar ADD COLUMN name TEXT');
await client.end();
});
it('can execute multiple statements in a transaction if no data', async () => {
const client = new pg.Client()
await client.connect()
await client.query('begin')
const client = new pg.Client();
await client.connect();
await client.query('begin');
// create a cursor that has no data response
const createText = 'CREATE TEMP TABLE foobar(id SERIAL PRIMARY KEY)'
const cursor = client.query(new Cursor(createText))
const err = await new Promise(resolve => cursor.read(100, resolve))
assert.ifError(err)
await client.query('ALTER TABLE foobar ADD COLUMN name TEXT')
await client.end()
})
})
const createText = 'CREATE TEMP TABLE foobar(id SERIAL PRIMARY KEY)';
const cursor = client.query(new Cursor(createText));
const err = await new Promise((resolve) => cursor.read(100, resolve));
assert.ifError(err);
await client.query('ALTER TABLE foobar ADD COLUMN name TEXT');
await client.end();
});
});