mirror of
https://github.com/brianc/node-postgres.git
synced 2025-12-08 20:16:25 +00:00
Passing nothing for both the query.text and query.name is unsupported but previously crashed with an impossible to catch error.
208 lines
5.7 KiB
JavaScript
208 lines
5.7 KiB
JavaScript
"use strict";
|
|
/**
|
|
* Copyright (c) 2010-2017 Brian Carlson (brian.m.carlson@gmail.com)
|
|
* All rights reserved.
|
|
*
|
|
* This source code is licensed under the MIT license found in the
|
|
* README.md file in the root directory of this source tree.
|
|
*/
|
|
|
|
var EventEmitter = require('events').EventEmitter;
|
|
var util = require('util');
|
|
|
|
var Result = require('./result');
|
|
var utils = require('./utils');
|
|
|
|
var Query = function(config, values, callback) {
|
|
// use of "new" optional
|
|
if(!(this instanceof Query)) { return new Query(config, values, callback); }
|
|
|
|
config = utils.normalizeQueryConfig(config, values, callback);
|
|
|
|
this.text = config.text;
|
|
this.values = config.values;
|
|
this.rows = config.rows;
|
|
this.types = config.types;
|
|
this.name = config.name;
|
|
this.binary = config.binary;
|
|
this.stream = config.stream;
|
|
//use unique portal name each time
|
|
this.portal = config.portal || "";
|
|
this.callback = config.callback;
|
|
if(process.domain && config.callback) {
|
|
this.callback = process.domain.bind(config.callback);
|
|
}
|
|
this._result = new Result(config.rowMode, config.types);
|
|
this.isPreparedStatement = false;
|
|
this._canceledDueToError = false;
|
|
this._promise = null;
|
|
EventEmitter.call(this);
|
|
};
|
|
|
|
util.inherits(Query, EventEmitter);
|
|
|
|
Query.prototype.requiresPreparation = function() {
|
|
//named queries must always be prepared
|
|
if(this.name) { return true; }
|
|
//always prepare if there are max number of rows expected per
|
|
//portal execution
|
|
if(this.rows) { return true; }
|
|
//don't prepare empty text queries
|
|
if(!this.text) { return false; }
|
|
//prepare if there are values
|
|
if(!this.values) { return false; }
|
|
return this.values.length > 0;
|
|
};
|
|
|
|
//associates row metadata from the supplied
|
|
//message with this query object
|
|
//metadata used when parsing row results
|
|
Query.prototype.handleRowDescription = function(msg) {
|
|
this._result.addFields(msg.fields);
|
|
this._accumulateRows = this.callback || !this.listeners('row').length;
|
|
};
|
|
|
|
Query.prototype.handleDataRow = function(msg) {
|
|
var row;
|
|
|
|
if (this._canceledDueToError) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
row = this._result.parseRow(msg.fields);
|
|
} catch (err) {
|
|
this._canceledDueToError = err;
|
|
return;
|
|
}
|
|
|
|
this.emit('row', row, this._result);
|
|
if (this._accumulateRows) {
|
|
this._result.addRow(row);
|
|
}
|
|
};
|
|
|
|
Query.prototype.handleCommandComplete = function(msg, con) {
|
|
this._result.addCommandComplete(msg);
|
|
//need to sync after each command complete of a prepared statement
|
|
if(this.isPreparedStatement) {
|
|
con.sync();
|
|
}
|
|
};
|
|
|
|
//if a named prepared statement is created with empty query text
|
|
//the backend will send an emptyQuery message but *not* a command complete message
|
|
//execution on the connection will hang until the backend receives a sync message
|
|
Query.prototype.handleEmptyQuery = function(con) {
|
|
if (this.isPreparedStatement) {
|
|
con.sync();
|
|
}
|
|
};
|
|
|
|
Query.prototype.handleReadyForQuery = function(con) {
|
|
if(this._canceledDueToError) {
|
|
return this.handleError(this._canceledDueToError, con);
|
|
}
|
|
if(this.callback) {
|
|
this.callback(null, this._result);
|
|
}
|
|
this.emit('end', this._result);
|
|
};
|
|
|
|
Query.prototype.handleError = function(err, connection) {
|
|
//need to sync after error during a prepared statement
|
|
if(this.isPreparedStatement) {
|
|
connection.sync();
|
|
}
|
|
if(this._canceledDueToError) {
|
|
err = this._canceledDueToError;
|
|
this._canceledDueToError = false;
|
|
}
|
|
//if callback supplied do not emit error event as uncaught error
|
|
//events will bubble up to node process
|
|
if(this.callback) {
|
|
return this.callback(err);
|
|
}
|
|
this.emit('error', err);
|
|
};
|
|
|
|
Query.prototype.submit = function(connection) {
|
|
if (typeof this.text != 'string' && typeof this.name != 'string') {
|
|
const err = new Error('A query must have either text or a name. Supplying neither is unsupported.')
|
|
connection.emit('error', err)
|
|
connection.emit('readyForQuery')
|
|
return
|
|
}
|
|
if(this.requiresPreparation()) {
|
|
this.prepare(connection);
|
|
} else {
|
|
connection.query(this.text);
|
|
}
|
|
};
|
|
|
|
Query.prototype.hasBeenParsed = function(connection) {
|
|
return this.name && connection.parsedStatements[this.name];
|
|
};
|
|
|
|
Query.prototype.handlePortalSuspended = function(connection) {
|
|
this._getRows(connection, this.rows);
|
|
};
|
|
|
|
Query.prototype._getRows = function(connection, rows) {
|
|
connection.execute({
|
|
portal: this.portalName,
|
|
rows: rows
|
|
}, true);
|
|
connection.flush();
|
|
};
|
|
|
|
Query.prototype.prepare = function(connection) {
|
|
var self = this;
|
|
//prepared statements need sync to be called after each command
|
|
//complete or when an error is encountered
|
|
this.isPreparedStatement = true;
|
|
//TODO refactor this poor encapsulation
|
|
if(!this.hasBeenParsed(connection)) {
|
|
connection.parse({
|
|
text: self.text,
|
|
name: self.name,
|
|
types: self.types
|
|
}, true);
|
|
}
|
|
|
|
if(self.values) {
|
|
self.values = self.values.map(utils.prepareValue);
|
|
}
|
|
|
|
//http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY
|
|
connection.bind({
|
|
portal: self.portalName,
|
|
statement: self.name,
|
|
values: self.values,
|
|
binary: self.binary
|
|
}, true);
|
|
|
|
connection.describe({
|
|
type: 'P',
|
|
name: self.portalName || ""
|
|
}, true);
|
|
|
|
this._getRows(connection, this.rows);
|
|
};
|
|
|
|
Query.prototype.handleCopyInResponse = function (connection) {
|
|
if(this.stream) this.stream.startStreamingToConnection(connection);
|
|
else connection.sendCopyFail('No source stream defined');
|
|
};
|
|
|
|
Query.prototype.handleCopyData = function (msg, connection) {
|
|
var chunk = msg.chunk;
|
|
if(this.stream) {
|
|
this.stream.handleChunk(chunk);
|
|
}
|
|
//if there are no stream (for example when copy to query was sent by
|
|
//query method instead of copyTo) error will be handled
|
|
//on copyOutResponse event, so silently ignore this error here
|
|
};
|
|
module.exports = Query;
|