Add configurable query timeout (#1760)

* Add read_timeout to connection settings

* Fix uncaught error issue

* Fix lint

* Fix "queryCallback is not a function"

* Added test and fixed error returning

* Added query timeout to native client

* Added test for timeout not reached

* Ensure error is the correct one
Correct test name

* Removed dubious check

* Added new test

* Improved test
This commit is contained in:
André Cruz 2018-11-29 15:11:47 +00:00 committed by Brian C
parent 3620e23899
commit eb076db5d4
5 changed files with 121 additions and 2 deletions

View File

@ -399,15 +399,20 @@ Client.prototype.query = function (config, values, callback) {
// can take in strings, config object or query object
var query
var result
var readTimeout
var readTimeoutTimer
var queryCallback
if (config === null || config === undefined) {
throw new TypeError('Client was passed a null or undefined query')
} else if (typeof config.submit === 'function') {
readTimeout = config.query_timeout || this.connectionParameters.query_timeout
result = query = config
if (typeof values === 'function') {
query.callback = query.callback || values
}
} else {
readTimeout = this.connectionParameters.query_timeout
query = new Query(config, values, callback)
if (!query.callback) {
result = new this._Promise((resolve, reject) => {
@ -416,6 +421,37 @@ Client.prototype.query = function (config, values, callback) {
}
}
if (readTimeout) {
queryCallback = query.callback
readTimeoutTimer = setTimeout(() => {
var error = new Error('Query read timeout')
process.nextTick(() => {
query.handleError(error, this.connection)
})
queryCallback(error)
// we already returned an error,
// just do nothing if query completes
query.callback = () => {}
// Remove from queue
var index = this.queryQueue.indexOf(query)
if (index > -1) {
this.queryQueue.splice(index, 1)
}
this._pulseQueryQueue()
}, readTimeout)
query.callback = (err, res) => {
clearTimeout(readTimeoutTimer)
queryCallback(err, res)
}
}
if (this.binary && !query.binary) {
query.binary = true
}

View File

@ -65,6 +65,7 @@ var ConnectionParameters = function (config) {
this.application_name = val('application_name', config, 'PGAPPNAME')
this.fallback_application_name = val('fallback_application_name', config, false)
this.statement_timeout = val('statement_timeout', config, false)
this.query_timeout = val('query_timeout', config, false)
}
// Convert arg to a string, surround in single quotes, and escape single quotes and backslashes

View File

@ -55,7 +55,10 @@ module.exports = {
parseInputDatesAsUTC: false,
// max milliseconds any query using this connection will execute for before timing out in error. false=unlimited
statement_timeout: false
statement_timeout: false,
// max miliseconds to wait for query to complete (client side)
query_timeout: false
}
var pgTypes = require('pg-types')

View File

@ -146,14 +146,21 @@ Client.prototype.connect = function (callback) {
Client.prototype.query = function (config, values, callback) {
var query
var result
var readTimeout
var readTimeoutTimer
var queryCallback
if (typeof config.submit === 'function') {
if (config === null || config === undefined) {
throw new TypeError('Client was passed a null or undefined query')
} else if (typeof config.submit === 'function') {
readTimeout = config.query_timeout || this.connectionParameters.query_timeout
result = query = config
// accept query(new Query(...), (err, res) => { }) style
if (typeof values === 'function') {
config.callback = values
}
} else {
readTimeout = this.connectionParameters.query_timeout
query = new NativeQuery(config, values, callback)
if (!query.callback) {
let resolveOut, rejectOut
@ -165,6 +172,37 @@ Client.prototype.query = function (config, values, callback) {
}
}
if (readTimeout) {
queryCallback = query.callback
readTimeoutTimer = setTimeout(() => {
var error = new Error('Query read timeout')
process.nextTick(() => {
query.handleError(error, this.connection)
})
queryCallback(error)
// we already returned an error,
// just do nothing if query completes
query.callback = () => {}
// Remove from queue
var index = this._queryQueue.indexOf(query)
if (index > -1) {
this._queryQueue.splice(index, 1)
}
this._pulseQueryQueue()
}, readTimeout)
query.callback = (err, res) => {
clearTimeout(readTimeoutTimer)
queryCallback(err, res)
}
}
if (!this._queryable) {
query.native = this.native
process.nextTick(() => {

View File

@ -15,6 +15,47 @@ suite.test('pool callback behavior', done => {
})
})
suite.test('query timeout', (cb) => {
const pool = new pg.Pool({query_timeout: 1000})
pool.connect().then((client) => {
client.query('SELECT pg_sleep(2)', assert.calls(function (err, result) {
assert(err)
assert(err.message === 'Query read timeout')
client.release()
pool.end(cb)
}))
})
})
suite.test('query recover from timeout', (cb) => {
const pool = new pg.Pool({query_timeout: 1000})
pool.connect().then((client) => {
client.query('SELECT pg_sleep(20)', assert.calls(function (err, result) {
assert(err)
assert(err.message === 'Query read timeout')
client.release(err)
pool.connect().then((client) => {
client.query('SELECT 1', assert.calls(function (err, result) {
assert(!err)
client.release(err)
pool.end(cb)
}))
})
}))
})
})
suite.test('query no timeout', (cb) => {
const pool = new pg.Pool({query_timeout: 10000})
pool.connect().then((client) => {
client.query('SELECT pg_sleep(1)', assert.calls(function (err, result) {
assert(!err)
client.release()
pool.end(cb)
}))
})
})
suite.test('callback API', done => {
const client = new helper.Client()
client.query('CREATE TEMP TABLE peep(name text)')