mirror of
https://github.com/brianc/node-postgres.git
synced 2025-12-08 20:16:25 +00:00
Fix double readyForQuery (#2420)
This is fixing a double readyForQuery message being sent from the backend (because we were calling sync after an error, which I already fixed in the main driver). Also closes #2333
This commit is contained in:
parent
c6aa29ade9
commit
4fde8b78f1
@ -37,6 +37,7 @@ Cursor.prototype._rowDescription = function () {
|
||||
}
|
||||
|
||||
Cursor.prototype.submit = function (connection) {
|
||||
this.state = 'submitted'
|
||||
this.connection = connection
|
||||
this._portal = 'C_' + nextUniqueID++
|
||||
|
||||
@ -87,7 +88,12 @@ Cursor.prototype._closePortal = function () {
|
||||
// open can lock tables for modification if inside a transaction.
|
||||
// see https://github.com/brianc/node-pg-cursor/issues/56
|
||||
this.connection.close({ type: 'P', name: this._portal })
|
||||
this.connection.sync()
|
||||
|
||||
// If we've received an error we already sent a sync message.
|
||||
// do not send another sync as it triggers another readyForQuery message.
|
||||
if (this.state !== 'error') {
|
||||
this.connection.sync()
|
||||
}
|
||||
}
|
||||
|
||||
Cursor.prototype.handleRowDescription = function (msg) {
|
||||
@ -138,8 +144,18 @@ Cursor.prototype.handleEmptyQuery = function () {
|
||||
}
|
||||
|
||||
Cursor.prototype.handleError = function (msg) {
|
||||
this.connection.removeListener('noData', this._ifNoData)
|
||||
this.connection.removeListener('rowDescription', this._rowDescription)
|
||||
// If we're in an initialized state we've never been submitted
|
||||
// and don't have a connection instance reference yet.
|
||||
// This can happen if you queue a stream and close the client before
|
||||
// the client has submitted the stream. In this scenario we don't have
|
||||
// a connection so there's nothing to unsubscribe from.
|
||||
if (this.state !== 'initialized') {
|
||||
this.connection.removeListener('noData', this._ifNoData)
|
||||
this.connection.removeListener('rowDescription', this._rowDescription)
|
||||
// call sync to trigger a readyForQuery
|
||||
this.connection.sync()
|
||||
}
|
||||
|
||||
this.state = 'error'
|
||||
this._error = msg
|
||||
// satisfy any waiting callback
|
||||
@ -155,8 +171,6 @@ Cursor.prototype.handleError = function (msg) {
|
||||
// only dispatch error events if we have a listener
|
||||
this.emit('error', msg)
|
||||
}
|
||||
// call sync to keep this connection from hanging
|
||||
this.connection.sync()
|
||||
}
|
||||
|
||||
Cursor.prototype._getRows = function (rows, cb) {
|
||||
@ -189,6 +203,7 @@ Cursor.prototype.close = function (cb) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
this._closePortal()
|
||||
this.state = 'done'
|
||||
if (cb) {
|
||||
@ -199,7 +214,7 @@ Cursor.prototype.close = function (cb) {
|
||||
}
|
||||
|
||||
Cursor.prototype.read = function (rows, cb) {
|
||||
if (this.state === 'idle') {
|
||||
if (this.state === 'idle' || this.state === 'submitted') {
|
||||
return this._getRows(rows, cb)
|
||||
}
|
||||
if (this.state === 'busy' || this.state === 'initialized') {
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import assert from 'assert'
|
||||
import helper from './helper'
|
||||
import QueryStream from '../src'
|
||||
import { Pool, Client } from 'pg'
|
||||
|
||||
helper('error', function (client) {
|
||||
it('receives error on stream', function (done) {
|
||||
@ -21,3 +22,71 @@ helper('error', function (client) {
|
||||
client.query('SELECT NOW()', done)
|
||||
})
|
||||
})
|
||||
|
||||
describe('error recovery', () => {
|
||||
// created from https://github.com/chrisdickinson/pg-test-case
|
||||
it('recovers from a streaming error in a transaction', async () => {
|
||||
const pool = new Pool()
|
||||
const client = await pool.connect()
|
||||
await client.query(`CREATE TEMP TABLE frobnicators (
|
||||
id serial primary key,
|
||||
updated timestamp
|
||||
)`)
|
||||
await client.query(`BEGIN;`)
|
||||
const query = new QueryStream(`INSERT INTO frobnicators ("updated") VALUES ($1) RETURNING "id"`, [Date.now()])
|
||||
let error: Error | undefined = undefined
|
||||
query.on('data', console.log).on('error', (e) => {
|
||||
error = e
|
||||
})
|
||||
client.query(query) // useless callback necessitated by an older version of honeycomb-beeline
|
||||
|
||||
await client.query(`ROLLBACK`)
|
||||
assert(error, 'Error should not be undefined')
|
||||
const { rows } = await client.query('SELECT NOW()')
|
||||
assert.strictEqual(rows.length, 1)
|
||||
client.release()
|
||||
const client2 = await pool.connect()
|
||||
await client2.query(`BEGIN`)
|
||||
client2.release()
|
||||
pool.end()
|
||||
})
|
||||
|
||||
// created from https://github.com/brianc/node-postgres/pull/2333
|
||||
it('handles an error on a stream after a plain text non-stream error', async () => {
|
||||
const client = new Client()
|
||||
const stmt = 'SELECT * FROM goose;'
|
||||
await client.connect()
|
||||
return new Promise((resolve, reject) => {
|
||||
client.query(stmt).catch((e) => {
|
||||
assert(e, 'Query should have rejected with an error')
|
||||
const stream = new QueryStream('SELECT * FROM duck')
|
||||
client.query(stream)
|
||||
stream.on('data', () => {})
|
||||
stream.on('error', () => {
|
||||
client.end((err) => {
|
||||
err ? reject(err) : resolve()
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
it('does not crash when closing a connection with a queued stream', async () => {
|
||||
const client = new Client()
|
||||
const stmt = 'SELECT * FROM goose;'
|
||||
await client.connect()
|
||||
return new Promise(async (resolve) => {
|
||||
let queryError: Error | undefined
|
||||
client.query(stmt).catch((e) => {
|
||||
queryError = e
|
||||
})
|
||||
const stream = client.query(new QueryStream(stmt))
|
||||
stream.on('data', () => {})
|
||||
stream.on('error', () => {
|
||||
assert(queryError, 'query should have errored due to client ending')
|
||||
resolve()
|
||||
})
|
||||
await client.end()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user