Fix named portal being left open

When code was added to use a random named portal instead of the empty portal to support redshift we didn't update the close messages approprately.  This could result in postgres keeping locks on tables for modification if streaming and table modification was both done within a transaction.  This update fixes the issue by always issuing a close command on the named portal when query is finished.

fixes #56
This commit is contained in:
Brian M. Carlson 2019-10-30 11:33:52 -05:00
parent 6d47026083
commit 389d5d8c14
4 changed files with 49 additions and 8 deletions

View File

@ -1,5 +1,8 @@
{
"extends": ["eslint:recommended"],
"parserOptions": {
"ecmaVersion": 2017
},
"plugins": ["prettier"],
"rules": {
"prettier/prettier": "error",

View File

@ -2,14 +2,14 @@ language: node_js
dist: trusty
sudo: false
node_js:
- "8"
- "10"
- "12"
- '8'
- '10'
- '12'
env:
- PGUSER=postgres
services:
- postgresql
addons:
postgresql: "9.6"
postgresql: '9.6'
before_script:
- psql -c 'create database travis;' -U postgres | true

View File

@ -75,6 +75,15 @@ Cursor.prototype._shiftQueue = function() {
}
}
Cursor.prototype._closePortal = function() {
// because we opened a named portal to stream results
// we need to close the same named portal. Leaving a named portal
// open can lock tables for modification if inside a transaction.
// see https://github.com/brianc/node-pg-cursor/issues/56
this.connection.close({ type: 'P', name: this._portal })
this.connection.sync()
}
Cursor.prototype.handleRowDescription = function(msg) {
this._result.addFields(msg.fields)
this.state = 'idle'
@ -105,7 +114,7 @@ Cursor.prototype._sendRows = function() {
Cursor.prototype.handleCommandComplete = function(msg) {
this._result.addCommandComplete(msg)
this.connection.sync()
this._closePortal()
}
Cursor.prototype.handlePortalSuspended = function() {
@ -114,8 +123,8 @@ Cursor.prototype.handlePortalSuspended = function() {
Cursor.prototype.handleReadyForQuery = function() {
this._sendRows()
this.emit('end', this._result)
this.state = 'done'
this.emit('end', this._result)
}
Cursor.prototype.handleEmptyQuery = function() {
@ -166,8 +175,7 @@ Cursor.prototype.close = function(cb) {
if (this.state === 'done') {
return setImmediate(cb)
}
this.connection.close({ type: 'P' })
this.connection.sync()
this._closePortal()
this.state = 'done'
if (cb) {
this.connection.once('closeComplete', function() {

30
test/transactions.js Normal file
View File

@ -0,0 +1,30 @@
const assert = require('assert')
const Cursor = require('../')
const pg = require('pg')
describe('transactions', () => {
it('can execute multiple statements in a transaction', async () => {
const client = new pg.Client()
await client.connect()
await client.query('begin')
await client.query('CREATE TEMP TABLE foobar(id SERIAL PRIMARY KEY)')
const cursor = client.query(new Cursor('SELECT * FROM foobar'))
const rows = await new Promise((resolve, reject) => {
cursor.read(10, (err, rows) => (err ? reject(err) : resolve(rows)))
})
assert.equal(rows.length, 0)
await client.query('ALTER TABLE foobar ADD COLUMN name TEXT')
await client.end()
})
it('can execute multiple statements in a transaction if ending cursor early', async () => {
const client = new pg.Client()
await client.connect()
await client.query('begin')
await client.query('CREATE TEMP TABLE foobar(id SERIAL PRIMARY KEY)')
const cursor = client.query(new Cursor('SELECT * FROM foobar'))
await new Promise(resolve => cursor.close(resolve))
await client.query('ALTER TABLE foobar ADD COLUMN name TEXT')
await client.end()
})
})