mirror of
https://github.com/brianc/node-postgres.git
synced 2025-12-08 20:16:25 +00:00
Merge pull request #2278 from brianc/bmc/refactor-to-classes
Refactor to es6 classes
This commit is contained in:
commit
2793ca74dc
@ -61,7 +61,7 @@ const run = async () => {
|
||||
queries = await bench(client, insert, seconds * 1000)
|
||||
console.log('insert queries:', queries)
|
||||
console.log('qps', queries / seconds)
|
||||
console.log('on my laptop best so far seen 5799 qps')
|
||||
console.log('on my laptop best so far seen 6303 qps')
|
||||
|
||||
console.log('')
|
||||
console.log('Warming up bytea test')
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -40,73 +40,6 @@ var readSSLConfigFromEnvironment = function () {
|
||||
return defaults.ssl
|
||||
}
|
||||
|
||||
var ConnectionParameters = function (config) {
|
||||
// if a string is passed, it is a raw connection string so we parse it into a config
|
||||
config = typeof config === 'string' ? parse(config) : config || {}
|
||||
|
||||
// if the config has a connectionString defined, parse IT into the config we use
|
||||
// this will override other default values with what is stored in connectionString
|
||||
if (config.connectionString) {
|
||||
config = Object.assign({}, config, parse(config.connectionString))
|
||||
}
|
||||
|
||||
this.user = val('user', config)
|
||||
this.database = val('database', config)
|
||||
|
||||
if (this.database === undefined) {
|
||||
this.database = this.user
|
||||
}
|
||||
|
||||
this.port = parseInt(val('port', config), 10)
|
||||
this.host = val('host', config)
|
||||
|
||||
// "hiding" the password so it doesn't show up in stack traces
|
||||
// or if the client is console.logged
|
||||
Object.defineProperty(this, 'password', {
|
||||
configurable: true,
|
||||
enumerable: false,
|
||||
writable: true,
|
||||
value: val('password', config),
|
||||
})
|
||||
|
||||
this.binary = val('binary', config)
|
||||
this.options = val('options', config)
|
||||
|
||||
this.ssl = typeof config.ssl === 'undefined' ? readSSLConfigFromEnvironment() : config.ssl
|
||||
|
||||
// support passing in ssl=no-verify via connection string
|
||||
if (this.ssl === 'no-verify') {
|
||||
this.ssl = { rejectUnauthorized: false }
|
||||
}
|
||||
|
||||
this.client_encoding = val('client_encoding', config)
|
||||
this.replication = val('replication', config)
|
||||
// a domain socket begins with '/'
|
||||
this.isDomainSocket = !(this.host || '').indexOf('/')
|
||||
|
||||
this.application_name = val('application_name', config, 'PGAPPNAME')
|
||||
this.fallback_application_name = val('fallback_application_name', config, false)
|
||||
this.statement_timeout = val('statement_timeout', config, false)
|
||||
this.idle_in_transaction_session_timeout = val('idle_in_transaction_session_timeout', config, false)
|
||||
this.query_timeout = val('query_timeout', config, false)
|
||||
|
||||
if (config.connectionTimeoutMillis === undefined) {
|
||||
this.connect_timeout = process.env.PGCONNECT_TIMEOUT || 0
|
||||
} else {
|
||||
this.connect_timeout = Math.floor(config.connectionTimeoutMillis / 1000)
|
||||
}
|
||||
|
||||
if (config.keepAlive === false) {
|
||||
this.keepalives = 0
|
||||
} else if (config.keepAlive === true) {
|
||||
this.keepalives = 1
|
||||
}
|
||||
|
||||
if (typeof config.keepAliveInitialDelayMillis === 'number') {
|
||||
this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000)
|
||||
}
|
||||
}
|
||||
|
||||
// Convert arg to a string, surround in single quotes, and escape single quotes and backslashes
|
||||
var quoteParamValue = function (value) {
|
||||
return "'" + ('' + value).replace(/\\/g, '\\\\').replace(/'/g, "\\'") + "'"
|
||||
@ -119,43 +52,112 @@ var add = function (params, config, paramName) {
|
||||
}
|
||||
}
|
||||
|
||||
ConnectionParameters.prototype.getLibpqConnectionString = function (cb) {
|
||||
var params = []
|
||||
add(params, this, 'user')
|
||||
add(params, this, 'password')
|
||||
add(params, this, 'port')
|
||||
add(params, this, 'application_name')
|
||||
add(params, this, 'fallback_application_name')
|
||||
add(params, this, 'connect_timeout')
|
||||
add(params, this, 'options')
|
||||
class ConnectionParameters {
|
||||
constructor(config) {
|
||||
// if a string is passed, it is a raw connection string so we parse it into a config
|
||||
config = typeof config === 'string' ? parse(config) : config || {}
|
||||
|
||||
var ssl = typeof this.ssl === 'object' ? this.ssl : this.ssl ? { sslmode: this.ssl } : {}
|
||||
add(params, ssl, 'sslmode')
|
||||
add(params, ssl, 'sslca')
|
||||
add(params, ssl, 'sslkey')
|
||||
add(params, ssl, 'sslcert')
|
||||
add(params, ssl, 'sslrootcert')
|
||||
// if the config has a connectionString defined, parse IT into the config we use
|
||||
// this will override other default values with what is stored in connectionString
|
||||
if (config.connectionString) {
|
||||
config = Object.assign({}, config, parse(config.connectionString))
|
||||
}
|
||||
|
||||
if (this.database) {
|
||||
params.push('dbname=' + quoteParamValue(this.database))
|
||||
this.user = val('user', config)
|
||||
this.database = val('database', config)
|
||||
|
||||
if (this.database === undefined) {
|
||||
this.database = this.user
|
||||
}
|
||||
|
||||
this.port = parseInt(val('port', config), 10)
|
||||
this.host = val('host', config)
|
||||
|
||||
// "hiding" the password so it doesn't show up in stack traces
|
||||
// or if the client is console.logged
|
||||
Object.defineProperty(this, 'password', {
|
||||
configurable: true,
|
||||
enumerable: false,
|
||||
writable: true,
|
||||
value: val('password', config),
|
||||
})
|
||||
|
||||
this.binary = val('binary', config)
|
||||
this.options = val('options', config)
|
||||
|
||||
this.ssl = typeof config.ssl === 'undefined' ? readSSLConfigFromEnvironment() : config.ssl
|
||||
|
||||
// support passing in ssl=no-verify via connection string
|
||||
if (this.ssl === 'no-verify') {
|
||||
this.ssl = { rejectUnauthorized: false }
|
||||
}
|
||||
|
||||
this.client_encoding = val('client_encoding', config)
|
||||
this.replication = val('replication', config)
|
||||
// a domain socket begins with '/'
|
||||
this.isDomainSocket = !(this.host || '').indexOf('/')
|
||||
|
||||
this.application_name = val('application_name', config, 'PGAPPNAME')
|
||||
this.fallback_application_name = val('fallback_application_name', config, false)
|
||||
this.statement_timeout = val('statement_timeout', config, false)
|
||||
this.idle_in_transaction_session_timeout = val('idle_in_transaction_session_timeout', config, false)
|
||||
this.query_timeout = val('query_timeout', config, false)
|
||||
|
||||
if (config.connectionTimeoutMillis === undefined) {
|
||||
this.connect_timeout = process.env.PGCONNECT_TIMEOUT || 0
|
||||
} else {
|
||||
this.connect_timeout = Math.floor(config.connectionTimeoutMillis / 1000)
|
||||
}
|
||||
|
||||
if (config.keepAlive === false) {
|
||||
this.keepalives = 0
|
||||
} else if (config.keepAlive === true) {
|
||||
this.keepalives = 1
|
||||
}
|
||||
|
||||
if (typeof config.keepAliveInitialDelayMillis === 'number') {
|
||||
this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000)
|
||||
}
|
||||
}
|
||||
if (this.replication) {
|
||||
params.push('replication=' + quoteParamValue(this.replication))
|
||||
|
||||
getLibpqConnectionString(cb) {
|
||||
var params = []
|
||||
add(params, this, 'user')
|
||||
add(params, this, 'password')
|
||||
add(params, this, 'port')
|
||||
add(params, this, 'application_name')
|
||||
add(params, this, 'fallback_application_name')
|
||||
add(params, this, 'connect_timeout')
|
||||
add(params, this, 'options')
|
||||
|
||||
var ssl = typeof this.ssl === 'object' ? this.ssl : this.ssl ? { sslmode: this.ssl } : {}
|
||||
add(params, ssl, 'sslmode')
|
||||
add(params, ssl, 'sslca')
|
||||
add(params, ssl, 'sslkey')
|
||||
add(params, ssl, 'sslcert')
|
||||
add(params, ssl, 'sslrootcert')
|
||||
|
||||
if (this.database) {
|
||||
params.push('dbname=' + quoteParamValue(this.database))
|
||||
}
|
||||
if (this.replication) {
|
||||
params.push('replication=' + quoteParamValue(this.replication))
|
||||
}
|
||||
if (this.host) {
|
||||
params.push('host=' + quoteParamValue(this.host))
|
||||
}
|
||||
if (this.isDomainSocket) {
|
||||
return cb(null, params.join(' '))
|
||||
}
|
||||
if (this.client_encoding) {
|
||||
params.push('client_encoding=' + quoteParamValue(this.client_encoding))
|
||||
}
|
||||
dns.lookup(this.host, function (err, address) {
|
||||
if (err) return cb(err, null)
|
||||
params.push('hostaddr=' + quoteParamValue(address))
|
||||
return cb(null, params.join(' '))
|
||||
})
|
||||
}
|
||||
if (this.host) {
|
||||
params.push('host=' + quoteParamValue(this.host))
|
||||
}
|
||||
if (this.isDomainSocket) {
|
||||
return cb(null, params.join(' '))
|
||||
}
|
||||
if (this.client_encoding) {
|
||||
params.push('client_encoding=' + quoteParamValue(this.client_encoding))
|
||||
}
|
||||
dns.lookup(this.host, function (err, address) {
|
||||
if (err) return cb(err, null)
|
||||
params.push('hostaddr=' + quoteParamValue(address))
|
||||
return cb(null, params.join(' '))
|
||||
})
|
||||
}
|
||||
|
||||
module.exports = ConnectionParameters
|
||||
|
||||
@ -13,201 +13,199 @@ var util = require('util')
|
||||
|
||||
const { parse, serialize } = require('pg-protocol')
|
||||
|
||||
// TODO(bmc) support binary mode at some point
|
||||
var Connection = function (config) {
|
||||
EventEmitter.call(this)
|
||||
config = config || {}
|
||||
this.stream = config.stream || new net.Socket()
|
||||
this._keepAlive = config.keepAlive
|
||||
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
|
||||
this.lastBuffer = false
|
||||
this.parsedStatements = {}
|
||||
this.ssl = config.ssl || false
|
||||
this._ending = false
|
||||
this._emitMessage = false
|
||||
var self = this
|
||||
this.on('newListener', function (eventName) {
|
||||
if (eventName === 'message') {
|
||||
self._emitMessage = true
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
util.inherits(Connection, EventEmitter)
|
||||
|
||||
Connection.prototype.connect = function (port, host) {
|
||||
var self = this
|
||||
|
||||
this._connecting = true
|
||||
this.stream.setNoDelay(true)
|
||||
this.stream.connect(port, host)
|
||||
|
||||
this.stream.once('connect', function () {
|
||||
if (self._keepAlive) {
|
||||
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
|
||||
}
|
||||
self.emit('connect')
|
||||
})
|
||||
|
||||
const reportStreamError = function (error) {
|
||||
// errors about disconnections should be ignored during disconnect
|
||||
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
|
||||
return
|
||||
}
|
||||
self.emit('error', error)
|
||||
}
|
||||
this.stream.on('error', reportStreamError)
|
||||
|
||||
this.stream.on('close', function () {
|
||||
self.emit('end')
|
||||
})
|
||||
|
||||
if (!this.ssl) {
|
||||
return this.attachListeners(this.stream)
|
||||
}
|
||||
|
||||
this.stream.once('data', function (buffer) {
|
||||
var responseCode = buffer.toString('utf8')
|
||||
switch (responseCode) {
|
||||
case 'S': // Server supports SSL connections, continue with a secure connection
|
||||
break
|
||||
case 'N': // Server does not support SSL connections
|
||||
self.stream.end()
|
||||
return self.emit('error', new Error('The server does not support SSL connections'))
|
||||
default:
|
||||
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
|
||||
self.stream.end()
|
||||
return self.emit('error', new Error('There was an error establishing an SSL connection'))
|
||||
}
|
||||
var tls = require('tls')
|
||||
const options = Object.assign(
|
||||
{
|
||||
socket: self.stream,
|
||||
},
|
||||
self.ssl
|
||||
)
|
||||
if (net.isIP(host) === 0) {
|
||||
options.servername = host
|
||||
}
|
||||
self.stream = tls.connect(options)
|
||||
self.attachListeners(self.stream)
|
||||
self.stream.on('error', reportStreamError)
|
||||
|
||||
self.emit('sslconnect')
|
||||
})
|
||||
}
|
||||
|
||||
Connection.prototype.attachListeners = function (stream) {
|
||||
stream.on('end', () => {
|
||||
this.emit('end')
|
||||
})
|
||||
parse(stream, (msg) => {
|
||||
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
|
||||
if (this._emitMessage) {
|
||||
this.emit('message', msg)
|
||||
}
|
||||
this.emit(eventName, msg)
|
||||
})
|
||||
}
|
||||
|
||||
Connection.prototype.requestSsl = function () {
|
||||
this.stream.write(serialize.requestSsl())
|
||||
}
|
||||
|
||||
Connection.prototype.startup = function (config) {
|
||||
this.stream.write(serialize.startup(config))
|
||||
}
|
||||
|
||||
Connection.prototype.cancel = function (processID, secretKey) {
|
||||
this._send(serialize.cancel(processID, secretKey))
|
||||
}
|
||||
|
||||
Connection.prototype.password = function (password) {
|
||||
this._send(serialize.password(password))
|
||||
}
|
||||
|
||||
Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) {
|
||||
this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
|
||||
}
|
||||
|
||||
Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) {
|
||||
this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
|
||||
}
|
||||
|
||||
Connection.prototype._send = function (buffer) {
|
||||
if (!this.stream.writable) {
|
||||
return false
|
||||
}
|
||||
return this.stream.write(buffer)
|
||||
}
|
||||
|
||||
Connection.prototype.query = function (text) {
|
||||
this._send(serialize.query(text))
|
||||
}
|
||||
|
||||
// send parse message
|
||||
Connection.prototype.parse = function (query) {
|
||||
this._send(serialize.parse(query))
|
||||
}
|
||||
|
||||
// send bind message
|
||||
// "more" === true to buffer the message until flush() is called
|
||||
Connection.prototype.bind = function (config) {
|
||||
this._send(serialize.bind(config))
|
||||
}
|
||||
|
||||
// send execute message
|
||||
// "more" === true to buffer the message until flush() is called
|
||||
Connection.prototype.execute = function (config) {
|
||||
this._send(serialize.execute(config))
|
||||
}
|
||||
|
||||
const flushBuffer = serialize.flush()
|
||||
Connection.prototype.flush = function () {
|
||||
if (this.stream.writable) {
|
||||
this.stream.write(flushBuffer)
|
||||
}
|
||||
}
|
||||
|
||||
const syncBuffer = serialize.sync()
|
||||
Connection.prototype.sync = function () {
|
||||
this._ending = true
|
||||
this._send(flushBuffer)
|
||||
this._send(syncBuffer)
|
||||
}
|
||||
|
||||
const endBuffer = serialize.end()
|
||||
|
||||
Connection.prototype.end = function () {
|
||||
// 0x58 = 'X'
|
||||
this._ending = true
|
||||
if (!this._connecting || !this.stream.writable) {
|
||||
this.stream.end()
|
||||
return
|
||||
// TODO(bmc) support binary mode at some point
|
||||
class Connection extends EventEmitter {
|
||||
constructor(config) {
|
||||
super()
|
||||
config = config || {}
|
||||
this.stream = config.stream || new net.Socket()
|
||||
this._keepAlive = config.keepAlive
|
||||
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
|
||||
this.lastBuffer = false
|
||||
this.parsedStatements = {}
|
||||
this.ssl = config.ssl || false
|
||||
this._ending = false
|
||||
this._emitMessage = false
|
||||
var self = this
|
||||
this.on('newListener', function (eventName) {
|
||||
if (eventName === 'message') {
|
||||
self._emitMessage = true
|
||||
}
|
||||
})
|
||||
}
|
||||
return this.stream.write(endBuffer, () => {
|
||||
this.stream.end()
|
||||
})
|
||||
}
|
||||
|
||||
Connection.prototype.close = function (msg) {
|
||||
this._send(serialize.close(msg))
|
||||
}
|
||||
connect(port, host) {
|
||||
var self = this
|
||||
|
||||
Connection.prototype.describe = function (msg) {
|
||||
this._send(serialize.describe(msg))
|
||||
}
|
||||
this._connecting = true
|
||||
this.stream.setNoDelay(true)
|
||||
this.stream.connect(port, host)
|
||||
|
||||
Connection.prototype.sendCopyFromChunk = function (chunk) {
|
||||
this._send(serialize.copyData(chunk))
|
||||
}
|
||||
this.stream.once('connect', function () {
|
||||
if (self._keepAlive) {
|
||||
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
|
||||
}
|
||||
self.emit('connect')
|
||||
})
|
||||
|
||||
Connection.prototype.endCopyFrom = function () {
|
||||
this._send(serialize.copyDone())
|
||||
}
|
||||
const reportStreamError = function (error) {
|
||||
// errors about disconnections should be ignored during disconnect
|
||||
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
|
||||
return
|
||||
}
|
||||
self.emit('error', error)
|
||||
}
|
||||
this.stream.on('error', reportStreamError)
|
||||
|
||||
Connection.prototype.sendCopyFail = function (msg) {
|
||||
this._send(serialize.copyFail(msg))
|
||||
this.stream.on('close', function () {
|
||||
self.emit('end')
|
||||
})
|
||||
|
||||
if (!this.ssl) {
|
||||
return this.attachListeners(this.stream)
|
||||
}
|
||||
|
||||
this.stream.once('data', function (buffer) {
|
||||
var responseCode = buffer.toString('utf8')
|
||||
switch (responseCode) {
|
||||
case 'S': // Server supports SSL connections, continue with a secure connection
|
||||
break
|
||||
case 'N': // Server does not support SSL connections
|
||||
self.stream.end()
|
||||
return self.emit('error', new Error('The server does not support SSL connections'))
|
||||
default:
|
||||
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
|
||||
self.stream.end()
|
||||
return self.emit('error', new Error('There was an error establishing an SSL connection'))
|
||||
}
|
||||
var tls = require('tls')
|
||||
const options = Object.assign(
|
||||
{
|
||||
socket: self.stream,
|
||||
},
|
||||
self.ssl
|
||||
)
|
||||
if (net.isIP(host) === 0) {
|
||||
options.servername = host
|
||||
}
|
||||
self.stream = tls.connect(options)
|
||||
self.attachListeners(self.stream)
|
||||
self.stream.on('error', reportStreamError)
|
||||
|
||||
self.emit('sslconnect')
|
||||
})
|
||||
}
|
||||
|
||||
attachListeners(stream) {
|
||||
stream.on('end', () => {
|
||||
this.emit('end')
|
||||
})
|
||||
parse(stream, (msg) => {
|
||||
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
|
||||
if (this._emitMessage) {
|
||||
this.emit('message', msg)
|
||||
}
|
||||
this.emit(eventName, msg)
|
||||
})
|
||||
}
|
||||
|
||||
requestSsl() {
|
||||
this.stream.write(serialize.requestSsl())
|
||||
}
|
||||
|
||||
startup(config) {
|
||||
this.stream.write(serialize.startup(config))
|
||||
}
|
||||
|
||||
cancel(processID, secretKey) {
|
||||
this._send(serialize.cancel(processID, secretKey))
|
||||
}
|
||||
|
||||
password(password) {
|
||||
this._send(serialize.password(password))
|
||||
}
|
||||
|
||||
sendSASLInitialResponseMessage(mechanism, initialResponse) {
|
||||
this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
|
||||
}
|
||||
|
||||
sendSCRAMClientFinalMessage(additionalData) {
|
||||
this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
|
||||
}
|
||||
|
||||
_send(buffer) {
|
||||
if (!this.stream.writable) {
|
||||
return false
|
||||
}
|
||||
return this.stream.write(buffer)
|
||||
}
|
||||
|
||||
query(text) {
|
||||
this._send(serialize.query(text))
|
||||
}
|
||||
|
||||
// send parse message
|
||||
parse(query) {
|
||||
this._send(serialize.parse(query))
|
||||
}
|
||||
|
||||
// send bind message
|
||||
bind(config) {
|
||||
this._send(serialize.bind(config))
|
||||
}
|
||||
|
||||
// send execute message
|
||||
execute(config) {
|
||||
this._send(serialize.execute(config))
|
||||
}
|
||||
|
||||
flush() {
|
||||
if (this.stream.writable) {
|
||||
this.stream.write(flushBuffer)
|
||||
}
|
||||
}
|
||||
|
||||
sync() {
|
||||
this._ending = true
|
||||
this._send(flushBuffer)
|
||||
this._send(syncBuffer)
|
||||
}
|
||||
|
||||
end() {
|
||||
// 0x58 = 'X'
|
||||
this._ending = true
|
||||
if (!this._connecting || !this.stream.writable) {
|
||||
this.stream.end()
|
||||
return
|
||||
}
|
||||
return this.stream.write(endBuffer, () => {
|
||||
this.stream.end()
|
||||
})
|
||||
}
|
||||
|
||||
close(msg) {
|
||||
this._send(serialize.close(msg))
|
||||
}
|
||||
|
||||
describe(msg) {
|
||||
this._send(serialize.describe(msg))
|
||||
}
|
||||
|
||||
sendCopyFromChunk(chunk) {
|
||||
this._send(serialize.copyData(chunk))
|
||||
}
|
||||
|
||||
endCopyFrom() {
|
||||
this._send(serialize.copyDone())
|
||||
}
|
||||
|
||||
sendCopyFail(msg) {
|
||||
this._send(serialize.copyFail(msg))
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Connection
|
||||
|
||||
@ -176,30 +176,26 @@ class Query extends EventEmitter {
|
||||
}
|
||||
|
||||
_getRows(connection, rows) {
|
||||
connection.execute(
|
||||
{
|
||||
portal: this.portal,
|
||||
rows: rows,
|
||||
},
|
||||
true
|
||||
)
|
||||
connection.execute({
|
||||
portal: this.portal,
|
||||
rows: rows,
|
||||
})
|
||||
connection.flush()
|
||||
}
|
||||
|
||||
// http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY
|
||||
prepare(connection) {
|
||||
// prepared statements need sync to be called after each command
|
||||
// complete or when an error is encountered
|
||||
this.isPreparedStatement = true
|
||||
|
||||
// TODO refactor this poor encapsulation
|
||||
if (!this.hasBeenParsed(connection)) {
|
||||
connection.parse(
|
||||
{
|
||||
text: this.text,
|
||||
name: this.name,
|
||||
types: this.types,
|
||||
},
|
||||
true
|
||||
)
|
||||
connection.parse({
|
||||
text: this.text,
|
||||
name: this.name,
|
||||
types: this.types,
|
||||
})
|
||||
}
|
||||
|
||||
if (this.values) {
|
||||
@ -211,24 +207,17 @@ class Query extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
// http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY
|
||||
connection.bind(
|
||||
{
|
||||
portal: this.portal,
|
||||
statement: this.name,
|
||||
values: this.values,
|
||||
binary: this.binary,
|
||||
},
|
||||
true
|
||||
)
|
||||
connection.bind({
|
||||
portal: this.portal,
|
||||
statement: this.name,
|
||||
values: this.values,
|
||||
binary: this.binary,
|
||||
})
|
||||
|
||||
connection.describe(
|
||||
{
|
||||
type: 'P',
|
||||
name: this.portal || '',
|
||||
},
|
||||
true
|
||||
)
|
||||
connection.describe({
|
||||
type: 'P',
|
||||
name: this.portal || '',
|
||||
})
|
||||
|
||||
this._getRows(connection, this.rows)
|
||||
}
|
||||
|
||||
@ -9,95 +9,170 @@
|
||||
|
||||
var types = require('pg-types')
|
||||
|
||||
var matchRegexp = /^([A-Za-z]+)(?: (\d+))?(?: (\d+))?/
|
||||
|
||||
// result object returned from query
|
||||
// in the 'end' event and also
|
||||
// passed as second argument to provided callback
|
||||
var Result = function (rowMode, types) {
|
||||
this.command = null
|
||||
this.rowCount = null
|
||||
this.oid = null
|
||||
this.rows = []
|
||||
this.fields = []
|
||||
this._parsers = undefined
|
||||
this._types = types
|
||||
this.RowCtor = null
|
||||
this.rowAsArray = rowMode === 'array'
|
||||
if (this.rowAsArray) {
|
||||
this.parseRow = this._parseRowAsArray
|
||||
}
|
||||
}
|
||||
|
||||
var matchRegexp = /^([A-Za-z]+)(?: (\d+))?(?: (\d+))?/
|
||||
|
||||
// adds a command complete message
|
||||
Result.prototype.addCommandComplete = function (msg) {
|
||||
var match
|
||||
if (msg.text) {
|
||||
// pure javascript
|
||||
match = matchRegexp.exec(msg.text)
|
||||
} else {
|
||||
// native bindings
|
||||
match = matchRegexp.exec(msg.command)
|
||||
}
|
||||
if (match) {
|
||||
this.command = match[1]
|
||||
if (match[3]) {
|
||||
// COMMMAND OID ROWS
|
||||
this.oid = parseInt(match[2], 10)
|
||||
this.rowCount = parseInt(match[3], 10)
|
||||
} else if (match[2]) {
|
||||
// COMMAND ROWS
|
||||
this.rowCount = parseInt(match[2], 10)
|
||||
class Result {
|
||||
constructor(rowMode, types) {
|
||||
this.command = null
|
||||
this.rowCount = null
|
||||
this.oid = null
|
||||
this.rows = []
|
||||
this.fields = []
|
||||
this._parsers = undefined
|
||||
this._types = types
|
||||
this.RowCtor = null
|
||||
this.rowAsArray = rowMode === 'array'
|
||||
if (this.rowAsArray) {
|
||||
this.parseRow = this._parseRowAsArray
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Result.prototype._parseRowAsArray = function (rowData) {
|
||||
var row = new Array(rowData.length)
|
||||
for (var i = 0, len = rowData.length; i < len; i++) {
|
||||
var rawValue = rowData[i]
|
||||
if (rawValue !== null) {
|
||||
row[i] = this._parsers[i](rawValue)
|
||||
// adds a command complete message
|
||||
addCommandComplete(msg) {
|
||||
var match
|
||||
if (msg.text) {
|
||||
// pure javascript
|
||||
match = matchRegexp.exec(msg.text)
|
||||
} else {
|
||||
row[i] = null
|
||||
// native bindings
|
||||
match = matchRegexp.exec(msg.command)
|
||||
}
|
||||
if (match) {
|
||||
this.command = match[1]
|
||||
if (match[3]) {
|
||||
// COMMMAND OID ROWS
|
||||
this.oid = parseInt(match[2], 10)
|
||||
this.rowCount = parseInt(match[3], 10)
|
||||
} else if (match[2]) {
|
||||
// COMMAND ROWS
|
||||
this.rowCount = parseInt(match[2], 10)
|
||||
}
|
||||
}
|
||||
}
|
||||
return row
|
||||
}
|
||||
|
||||
Result.prototype.parseRow = function (rowData) {
|
||||
var row = {}
|
||||
for (var i = 0, len = rowData.length; i < len; i++) {
|
||||
var rawValue = rowData[i]
|
||||
var field = this.fields[i].name
|
||||
if (rawValue !== null) {
|
||||
row[field] = this._parsers[i](rawValue)
|
||||
} else {
|
||||
row[field] = null
|
||||
_parseRowAsArray(rowData) {
|
||||
var row = new Array(rowData.length)
|
||||
for (var i = 0, len = rowData.length; i < len; i++) {
|
||||
var rawValue = rowData[i]
|
||||
if (rawValue !== null) {
|
||||
row[i] = this._parsers[i](rawValue)
|
||||
} else {
|
||||
row[i] = null
|
||||
}
|
||||
}
|
||||
return row
|
||||
}
|
||||
|
||||
parseRow(rowData) {
|
||||
var row = {}
|
||||
for (var i = 0, len = rowData.length; i < len; i++) {
|
||||
var rawValue = rowData[i]
|
||||
var field = this.fields[i].name
|
||||
if (rawValue !== null) {
|
||||
row[field] = this._parsers[i](rawValue)
|
||||
} else {
|
||||
row[field] = null
|
||||
}
|
||||
}
|
||||
return row
|
||||
}
|
||||
|
||||
addRow(row) {
|
||||
this.rows.push(row)
|
||||
}
|
||||
|
||||
addFields(fieldDescriptions) {
|
||||
// clears field definitions
|
||||
// multiple query statements in 1 action can result in multiple sets
|
||||
// of rowDescriptions...eg: 'select NOW(); select 1::int;'
|
||||
// you need to reset the fields
|
||||
this.fields = fieldDescriptions
|
||||
if (this.fields.length) {
|
||||
this._parsers = new Array(fieldDescriptions.length)
|
||||
}
|
||||
for (var i = 0; i < fieldDescriptions.length; i++) {
|
||||
var desc = fieldDescriptions[i]
|
||||
if (this._types) {
|
||||
this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text')
|
||||
} else {
|
||||
this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text')
|
||||
}
|
||||
}
|
||||
}
|
||||
return row
|
||||
}
|
||||
|
||||
Result.prototype.addRow = function (row) {
|
||||
this.rows.push(row)
|
||||
}
|
||||
|
||||
Result.prototype.addFields = function (fieldDescriptions) {
|
||||
// clears field definitions
|
||||
// multiple query statements in 1 action can result in multiple sets
|
||||
// of rowDescriptions...eg: 'select NOW(); select 1::int;'
|
||||
// you need to reset the fields
|
||||
this.fields = fieldDescriptions
|
||||
if (this.fields.length) {
|
||||
this._parsers = new Array(fieldDescriptions.length)
|
||||
}
|
||||
for (var i = 0; i < fieldDescriptions.length; i++) {
|
||||
var desc = fieldDescriptions[i]
|
||||
if (this._types) {
|
||||
this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text')
|
||||
// adds a command complete message
|
||||
addCommandComplete(msg) {
|
||||
var match
|
||||
if (msg.text) {
|
||||
// pure javascript
|
||||
match = matchRegexp.exec(msg.text)
|
||||
} else {
|
||||
this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text')
|
||||
// native bindings
|
||||
match = matchRegexp.exec(msg.command)
|
||||
}
|
||||
if (match) {
|
||||
this.command = match[1]
|
||||
if (match[3]) {
|
||||
// COMMMAND OID ROWS
|
||||
this.oid = parseInt(match[2], 10)
|
||||
this.rowCount = parseInt(match[3], 10)
|
||||
} else if (match[2]) {
|
||||
// COMMAND ROWS
|
||||
this.rowCount = parseInt(match[2], 10)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_parseRowAsArray(rowData) {
|
||||
var row = new Array(rowData.length)
|
||||
for (var i = 0, len = rowData.length; i < len; i++) {
|
||||
var rawValue = rowData[i]
|
||||
if (rawValue !== null) {
|
||||
row[i] = this._parsers[i](rawValue)
|
||||
} else {
|
||||
row[i] = null
|
||||
}
|
||||
}
|
||||
return row
|
||||
}
|
||||
|
||||
parseRow(rowData) {
|
||||
var row = {}
|
||||
for (var i = 0, len = rowData.length; i < len; i++) {
|
||||
var rawValue = rowData[i]
|
||||
var field = this.fields[i].name
|
||||
if (rawValue !== null) {
|
||||
row[field] = this._parsers[i](rawValue)
|
||||
} else {
|
||||
row[field] = null
|
||||
}
|
||||
}
|
||||
return row
|
||||
}
|
||||
|
||||
addRow(row) {
|
||||
this.rows.push(row)
|
||||
}
|
||||
|
||||
addFields(fieldDescriptions) {
|
||||
// clears field definitions
|
||||
// multiple query statements in 1 action can result in multiple sets
|
||||
// of rowDescriptions...eg: 'select NOW(); select 1::int;'
|
||||
// you need to reset the fields
|
||||
this.fields = fieldDescriptions
|
||||
if (this.fields.length) {
|
||||
this._parsers = new Array(fieldDescriptions.length)
|
||||
}
|
||||
for (var i = 0; i < fieldDescriptions.length; i++) {
|
||||
var desc = fieldDescriptions[i]
|
||||
if (this._types) {
|
||||
this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text')
|
||||
} else {
|
||||
this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text')
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user