mirror of
https://github.com/brianc/node-postgres.git
synced 2025-12-08 20:16:25 +00:00
On Windows, after a connect attempt has failed, an error event with ECONNRESET is emitted after the real connect error is propagated to the connect callback, because the connection is not in ending state (connection._ending) where ECONNRESET is ignored. This change ends the connection when connect has failed. This fixes #746.
355 lines
9.0 KiB
JavaScript
355 lines
9.0 KiB
JavaScript
/**
|
|
* Copyright (c) 2010-2016 Brian Carlson (brian.m.carlson@gmail.com)
|
|
* All rights reserved.
|
|
*
|
|
* This source code is licensed under the MIT license found in the
|
|
* README.md file in the root directory of this source tree.
|
|
*/
|
|
|
|
var crypto = require('crypto');
|
|
var EventEmitter = require('events').EventEmitter;
|
|
var util = require('util');
|
|
var pgPass = require('pgpass');
|
|
var TypeOverrides = require('./type-overrides');
|
|
|
|
var ConnectionParameters = require('./connection-parameters');
|
|
var Query = require('./query');
|
|
var defaults = require('./defaults');
|
|
var Connection = require('./connection');
|
|
|
|
var Client = function(config) {
|
|
EventEmitter.call(this);
|
|
|
|
this.connectionParameters = new ConnectionParameters(config);
|
|
this.user = this.connectionParameters.user;
|
|
this.database = this.connectionParameters.database;
|
|
this.port = this.connectionParameters.port;
|
|
this.host = this.connectionParameters.host;
|
|
this.password = this.connectionParameters.password;
|
|
|
|
var c = config || {};
|
|
|
|
this._types = new TypeOverrides(c.types);
|
|
|
|
this.connection = c.connection || new Connection({
|
|
stream: c.stream,
|
|
ssl: this.connectionParameters.ssl,
|
|
keepAlive: c.keepAlive || false
|
|
});
|
|
this.queryQueue = [];
|
|
this.binary = c.binary || defaults.binary;
|
|
this.encoding = 'utf8';
|
|
this.processID = null;
|
|
this.secretKey = null;
|
|
this.ssl = this.connectionParameters.ssl || false;
|
|
};
|
|
|
|
util.inherits(Client, EventEmitter);
|
|
|
|
Client.prototype.connect = function(callback) {
|
|
var self = this;
|
|
var con = this.connection;
|
|
|
|
if(this.host && this.host.indexOf('/') === 0) {
|
|
con.connect(this.host + '/.s.PGSQL.' + this.port);
|
|
} else {
|
|
con.connect(this.port, this.host);
|
|
}
|
|
|
|
|
|
//once connection is established send startup message
|
|
con.on('connect', function() {
|
|
if(self.ssl) {
|
|
con.requestSsl();
|
|
} else {
|
|
con.startup(self.getStartupConf());
|
|
}
|
|
});
|
|
|
|
con.on('sslconnect', function() {
|
|
con.startup(self.getStartupConf());
|
|
});
|
|
|
|
function checkPgPass(cb) {
|
|
return function(msg) {
|
|
if (null !== self.password) {
|
|
cb(msg);
|
|
} else {
|
|
pgPass(self.connectionParameters, function(pass){
|
|
if (undefined !== pass) {
|
|
self.connectionParameters.password = self.password = pass;
|
|
}
|
|
cb(msg);
|
|
});
|
|
}
|
|
};
|
|
}
|
|
|
|
//password request handling
|
|
con.on('authenticationCleartextPassword', checkPgPass(function() {
|
|
con.password(self.password);
|
|
}));
|
|
|
|
//password request handling
|
|
con.on('authenticationMD5Password', checkPgPass(function(msg) {
|
|
var inner = Client.md5(self.password + self.user);
|
|
var outer = Client.md5(Buffer.concat([new Buffer(inner), msg.salt]));
|
|
var md5password = "md5" + outer;
|
|
con.password(md5password);
|
|
}));
|
|
|
|
con.once('backendKeyData', function(msg) {
|
|
self.processID = msg.processID;
|
|
self.secretKey = msg.secretKey;
|
|
});
|
|
|
|
//hook up query handling events to connection
|
|
//after the connection initially becomes ready for queries
|
|
con.once('readyForQuery', function() {
|
|
|
|
//delegate rowDescription to active query
|
|
con.on('rowDescription', function(msg) {
|
|
self.activeQuery.handleRowDescription(msg);
|
|
});
|
|
|
|
//delegate dataRow to active query
|
|
con.on('dataRow', function(msg) {
|
|
self.activeQuery.handleDataRow(msg);
|
|
});
|
|
|
|
//delegate portalSuspended to active query
|
|
con.on('portalSuspended', function(msg) {
|
|
self.activeQuery.handlePortalSuspended(con);
|
|
});
|
|
|
|
//deletagate emptyQuery to active query
|
|
con.on('emptyQuery', function(msg) {
|
|
self.activeQuery.handleEmptyQuery(con);
|
|
});
|
|
|
|
//delegate commandComplete to active query
|
|
con.on('commandComplete', function(msg) {
|
|
self.activeQuery.handleCommandComplete(msg, con);
|
|
});
|
|
|
|
//if a prepared statement has a name and properly parses
|
|
//we track that its already been executed so we don't parse
|
|
//it again on the same client
|
|
con.on('parseComplete', function(msg) {
|
|
if(self.activeQuery.name) {
|
|
con.parsedStatements[self.activeQuery.name] = true;
|
|
}
|
|
});
|
|
|
|
con.on('copyInResponse', function(msg) {
|
|
self.activeQuery.handleCopyInResponse(self.connection);
|
|
});
|
|
|
|
con.on('copyData', function (msg) {
|
|
self.activeQuery.handleCopyData(msg, self.connection);
|
|
});
|
|
|
|
con.on('notification', function(msg) {
|
|
self.emit('notification', msg);
|
|
});
|
|
|
|
//process possible callback argument to Client#connect
|
|
if (callback) {
|
|
callback(null, self);
|
|
//remove callback for proper error handling
|
|
//after the connect event
|
|
callback = null;
|
|
}
|
|
self.emit('connect');
|
|
});
|
|
|
|
con.on('readyForQuery', function() {
|
|
var activeQuery = self.activeQuery;
|
|
self.activeQuery = null;
|
|
self.readyForQuery = true;
|
|
self._pulseQueryQueue();
|
|
if(activeQuery) {
|
|
activeQuery.handleReadyForQuery(con);
|
|
}
|
|
});
|
|
|
|
con.on('error', function(error) {
|
|
if(self.activeQuery) {
|
|
var activeQuery = self.activeQuery;
|
|
self.activeQuery = null;
|
|
return activeQuery.handleError(error, con);
|
|
}
|
|
if(!callback) {
|
|
return self.emit('error', error);
|
|
}
|
|
con.end(); // make sure ECONNRESET errors don't cause error events
|
|
callback(error);
|
|
callback = null;
|
|
});
|
|
|
|
con.once('end', function() {
|
|
if ( callback ) {
|
|
// haven't received a connection message yet !
|
|
var err = new Error('Connection terminated');
|
|
callback(err);
|
|
callback = null;
|
|
return;
|
|
}
|
|
if(self.activeQuery) {
|
|
var disconnectError = new Error('Connection terminated');
|
|
self.activeQuery.handleError(disconnectError, con);
|
|
self.activeQuery = null;
|
|
}
|
|
self.emit('end');
|
|
});
|
|
|
|
|
|
con.on('notice', function(msg) {
|
|
self.emit('notice', msg);
|
|
});
|
|
|
|
};
|
|
|
|
Client.prototype.getStartupConf = function() {
|
|
var params = this.connectionParameters;
|
|
|
|
var data = {
|
|
user: params.user,
|
|
database: params.database
|
|
};
|
|
|
|
var appName = params.application_name || params.fallback_application_name;
|
|
if (appName) {
|
|
data.application_name = appName;
|
|
}
|
|
|
|
return data;
|
|
};
|
|
|
|
Client.prototype.cancel = function(client, query) {
|
|
if(client.activeQuery == query) {
|
|
var con = this.connection;
|
|
|
|
if(this.host && this.host.indexOf('/') === 0) {
|
|
con.connect(this.host + '/.s.PGSQL.' + this.port);
|
|
} else {
|
|
con.connect(this.port, this.host);
|
|
}
|
|
|
|
//once connection is established send cancel message
|
|
con.on('connect', function() {
|
|
con.cancel(client.processID, client.secretKey);
|
|
});
|
|
} else if(client.queryQueue.indexOf(query) != -1) {
|
|
client.queryQueue.splice(client.queryQueue.indexOf(query), 1);
|
|
}
|
|
};
|
|
|
|
Client.prototype.setTypeParser = function(oid, format, parseFn) {
|
|
return this._types.setTypeParser(oid, format, parseFn);
|
|
};
|
|
|
|
Client.prototype.getTypeParser = function(oid, format) {
|
|
return this._types.getTypeParser(oid, format);
|
|
};
|
|
|
|
// Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c
|
|
Client.prototype.escapeIdentifier = function(str) {
|
|
|
|
var escaped = '"';
|
|
|
|
for(var i = 0; i < str.length; i++) {
|
|
var c = str[i];
|
|
if(c === '"') {
|
|
escaped += c + c;
|
|
} else {
|
|
escaped += c;
|
|
}
|
|
}
|
|
|
|
escaped += '"';
|
|
|
|
return escaped;
|
|
};
|
|
|
|
// Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c
|
|
Client.prototype.escapeLiteral = function(str) {
|
|
|
|
var hasBackslash = false;
|
|
var escaped = '\'';
|
|
|
|
for(var i = 0; i < str.length; i++) {
|
|
var c = str[i];
|
|
if(c === '\'') {
|
|
escaped += c + c;
|
|
} else if (c === '\\') {
|
|
escaped += c + c;
|
|
hasBackslash = true;
|
|
} else {
|
|
escaped += c;
|
|
}
|
|
}
|
|
|
|
escaped += '\'';
|
|
|
|
if(hasBackslash === true) {
|
|
escaped = ' E' + escaped;
|
|
}
|
|
|
|
return escaped;
|
|
};
|
|
|
|
Client.prototype._pulseQueryQueue = function() {
|
|
if(this.readyForQuery===true) {
|
|
this.activeQuery = this.queryQueue.shift();
|
|
if(this.activeQuery) {
|
|
this.readyForQuery = false;
|
|
this.hasExecuted = true;
|
|
this.activeQuery.submit(this.connection);
|
|
} else if(this.hasExecuted) {
|
|
this.activeQuery = null;
|
|
this.emit('drain');
|
|
}
|
|
}
|
|
};
|
|
|
|
Client.prototype.copyFrom = function (text) {
|
|
throw new Error("For PostgreSQL COPY TO/COPY FROM support npm install pg-copy-streams");
|
|
};
|
|
|
|
Client.prototype.copyTo = function (text) {
|
|
throw new Error("For PostgreSQL COPY TO/COPY FROM support npm install pg-copy-streams");
|
|
};
|
|
|
|
Client.prototype.query = function(config, values, callback) {
|
|
//can take in strings, config object or query object
|
|
var query = (typeof config.submit == 'function') ? config :
|
|
new Query(config, values, callback);
|
|
if(this.binary && !query.binary) {
|
|
query.binary = true;
|
|
}
|
|
if(query._result) {
|
|
query._result._getTypeParser = this._types.getTypeParser.bind(this._types);
|
|
}
|
|
|
|
this.queryQueue.push(query);
|
|
this._pulseQueryQueue();
|
|
return query;
|
|
};
|
|
|
|
Client.prototype.end = function(cb) {
|
|
this.connection.end();
|
|
if (cb) {
|
|
this.connection.once('end', cb);
|
|
}
|
|
};
|
|
|
|
Client.md5 = function(string) {
|
|
return crypto.createHash('md5').update(string, 'utf-8').digest('hex');
|
|
};
|
|
|
|
// expose a Query constructor
|
|
Client.Query = Query;
|
|
|
|
module.exports = Client;
|