Merge remote branch 'upstream/master'

Conflicts:
	lib/query.js
This commit is contained in:
Alexander Sulfrian 2011-03-02 14:21:33 +01:00
commit 67cb9f8196
9 changed files with 241 additions and 147 deletions

View File

@ -60,10 +60,10 @@ with love and TDD.
- mucho testing
~250 tests executed on
- ubuntu
- node v0.2.2, v0.2.3, v0.2.4, v0.2.5, v0.3.0, v0.3.1
- node v0.2.2, v0.2.3, v0.2.4, v0.2.5, v0.2.6, v0.3.0, v0.3.1, v0.3.2, v0.3.3, v0.3.4, v0.3.5, v0.3.6, v0.3.7, v0.3.8
- postgres 8.4.4
- osx
- node v0.2.2, v0.2.3, v0.2.4, v0.2.5, v0.3.0, v0.3.1
- node v0.2.2, v0.2.3, v0.2.4, v0.2.5, v0.2.6, v0.3.0, v0.3.1, v0.3.2, v0.3.3, v0.3.4, v0.3.5, v0.3.6, v0.3.7, v0.3.8
- postgres v8.4.4, v9.0.1 installed both locally and on networked Windows 7
## Contributing

View File

@ -0,0 +1,83 @@
var pg = require(__dirname + '/../lib')
var bencher = require('bencher');
var helper = require(__dirname + '/../test/test-helper')
var conString = helper.connectionString()
var round = function(num) {
return Math.round((num*1000))/1000
}
var doBenchmark = function() {
var bench = bencher({
name: 'select large sets',
repeat: 10,
actions: [{
name: 'selecting string',
run: function(next) {
var query = client.query('SELECT name FROM items');
query.on('end', function() {
next();
});
}
}, {
name: 'selecting integer',
run: function(next) {
var query = client.query('SELECT count FROM items');
query.on('end', function() {
next();
})
}
}, {
name: 'selecting date',
run: function(next) {
var query = client.query('SELECT created FROM items');
query.on('end', function() {
next();
})
}
}, {
name: 'selecting row',
run: function(next) {
var query = client.query('SELECT * FROM items');
query.on('end', function() {
next();
})
}
}, {
name: 'loading all rows into memory',
run: function(next) {
var query = client.query('SELECT * FROM items', next);
}
}]
});
bench(function(result) {
console.log();
console.log("%s (%d repeats):", result.name, result.repeat)
result.actions.forEach(function(action) {
console.log(" %s: \n average: %d ms\n total: %d ms", action.name, round(action.meanTime), round(action.totalTime));
})
client.end();
})
}
var client = new pg.Client(conString);
client.connect();
console.log();
console.log("creating temp table");
client.query("CREATE TEMP TABLE items(name VARCHAR(10), created TIMESTAMPTZ, count INTEGER)");
var count = 10000;
console.log("inserting %d rows", count);
for(var i = 0; i < count; i++) {
var query = {
name: 'insert',
text: "INSERT INTO items(name, created, count) VALUES($1, $2, $3)",
values: ["item"+i, new Date(2010, 01, 01, i, 0, 0), i]
};
client.query(query);
}
client.once('drain', function() {
console.log('done with insert. executing benchmark.');
doBenchmark();
});

View File

@ -1,28 +1,16 @@
var sys = require('sys');
var net = require('net');
var crypto = require('crypto');
var EventEmitter = require('events').EventEmitter;
var url = require('url');
var Query = require(__dirname + '/query');
var utils = require(__dirname + '/utils');
var defaults = require(__dirname + '/defaults');
var Connection = require(__dirname + '/connection');
var parseConnectionString = function(str) {
var result = url.parse(str);
result.host = result.hostname;
result.database = result.pathname ? result.pathname.slice(1) : null
var auth = (result.auth || ':').split(':');
result.user = auth[0];
result.password = auth[1];
return result;
};
var Client = function(config) {
EventEmitter.call(this);
if(typeof config === 'string') {
config = parseConnectionString(config)
config = utils.parseConnectionString(config)
}
config = config || {};
this.user = config.user || defaults.user;
@ -30,7 +18,7 @@ var Client = function(config) {
this.port = config.port || defaults.port;
this.host = config.host || defaults.host;
this.queryQueue = [];
this.connection = config.connection || new Connection({stream: config.stream || new net.Stream()});
this.connection = config.connection || new Connection({stream: config.stream});
this.queryQueue = [];
this.password = config.password || defaults.password;
this.encoding = 'utf8';
@ -66,27 +54,63 @@ p.connect = function() {
con.password(md5password);
});
//hook up query handling events to connection
//after the connection initially becomes ready for queries
con.once('readyForQuery', function() {
//delegate row descript to active query
con.on('rowDescription', function(msg) {
self.activeQuery.handleRowDescription(msg);
});
//delegate datarow to active query
con.on('dataRow', function(msg) {
self.activeQuery.handleDataRow(msg);
});
//TODO should query gain access to connection?
con.on('portalSuspended', function(msg) {
self.activeQuery.getRows(con);
});
con.on('commandComplete', function(msg) {
//delegate command complete to query
self.activeQuery.handleCommandComplete(msg);
//need to sync after each command complete of a prepared statement
if(self.activeQuery.isPreparedStatement) {
con.sync();
}
});
});
con.on('readyForQuery', function() {
self.readyForQuery = true;
if(self.activeQuery) {
self.activeQuery.handleReadyForQuery();
}
this.activeQuery = null;
self.pulseQueryQueue();
self.readyForQuery = true;
self._pulseQueryQueue();
});
con.on('error', function(error) {
if(!self.activeQuery) {
self.emit('error', error);
} else {
//need to sync after error during a prepared statement
if(self.activeQuery.isPreparedStatement) {
con.sync();
}
self.activeQuery.handleError(error);
self.activeQuery = null;
}
});
};
p.pulseQueryQueue = function() {
p._pulseQueryQueue = function() {
if(this.readyForQuery===true) {
if(this.queryQueue.length > 0) {
this.activeQuery = this.queryQueue.shift();
if(this.activeQuery) {
this.readyForQuery = false;
var query = this.queryQueue.shift();
this.activeQuery = query;
this.hasExecuted = true;
query.submit(this.connection);
this.activeQuery.submit(this.connection);
} else if(this.hasExecuted) {
this.activeQuery = null;
this.emit('drain')
@ -97,21 +121,20 @@ p.pulseQueryQueue = function() {
p.query = function(config, values, callback) {
//can take in strings or config objects
config = (config.text || config.name) ? config : { text: config };
if(values) {
if(typeof values === 'function') {
callback = values;
}
else {
} else {
config.values = values;
}
}
if(callback) {
config.callback = callback;
}
config.callback = callback;
var query = new Query(config);
this.queryQueue.push(query);
this.pulseQueryQueue();
this._pulseQueryQueue();
return query;
};

View File

@ -15,6 +15,10 @@ var Query = function(config) {
//set or used until a rowDescription message comes in
this.rowDescription = null;
this.callback = config.callback;
this._fieldNames = [];
this._fieldConverters = [];
this._result = new Result();
this.isPreparedStatement = false;
EventEmitter.call(this);
};
@ -30,9 +34,13 @@ var noParse = function(val) {
return val;
};
//creates datarow metatdata from the supplied
//data row information
var buildDataRowMetadata = function(msg, converters, names) {
//associates row metadata from the supplied
//message with this query object
//metadata used when parsing row results
p.handleRowDescription = function(msg) {
this._fieldNames = [];
this._fieldConverters = [];
var parsers = {
text: new TextParser(),
binary: new BinaryParser()
@ -42,51 +50,93 @@ var buildDataRowMetadata = function(msg, converters, names) {
for(var i = 0; i < len; i++) {
var field = msg.fields[i];
var dataTypeId = field.dataTypeID;
var format = field.format;
names[i] = field.name;
this._fieldNames[i] = field.name;
switch(dataTypeId) {
case 20:
converters[i] = parsers[format].parseInt64;
this._fieldConverters[i] = parsers[format].parseInt64;
break;
case 21:
converters[i] = parsers[format].parseInt16;
this._fieldConverters[i] = parsers[format].parseInt16;
break;
case 23:
converters[i] = parsers[format].parseInt32;
this._fieldConverters[i] = parsers[format].parseInt32;
break;
case 26:
converters[i] = parsers[format].parseInt64;
this._fieldConverters[i] = parsers[format].parseInt64;
break;
case 700:
converters[i] = parsers[format].parseFloat32;
this._fieldConverters[i] = parsers[format].parseFloat32;
break;
case 701:
converters[i] = parsers[format].parseFloat64;
this._fieldConverters[i] = parsers[format].parseFloat64;
break;
case 1700:
converters[i] = parsers[format].parseNumeric;
this._fieldConverters[i] = parsers[format].parseNumeric;
break;
case 16:
converters[i] = parsers[format].parseBool;
this._fieldConverters[i] = parsers[format].parseBool;
break;
case 1114:
case 1184:
converters[i] = parsers[format].parseDate;
this._fieldConverters[i] = parsers[format].parseDate;
break;
case 1008:
case 1009:
converters[i] = parsers[format].parseStringArray;
this._fieldConverters[i] = parsers[format].parseStringArray;
break;
case 1007:
case 1016:
converters[i] = parsers[format].parseIntArray;
this._fieldConverters[i] = parsers[format].parseIntArray;
break;
default:
converters[i] = dataTypeParsers[dataTypeId] || noParse;
this._fieldConverters[i] = dataTypeParsers[dataTypeId] || noParse;
break;
}
};
}
};
p.handleDataRow = function(msg) {
var self = this;
var row = {};
for(var i = 0; i < msg.fields.length; i++) {
var rawValue = msg.fields[i];
if(rawValue === null) {
//leave null values alone
row[self._fieldNames[i]] = null;
} else {
//convert value to javascript
row[self._fieldNames[i]] = self._fieldConverters[i](rawValue);
}
}
self.emit('row', row);
//if there is a callback collect rows
if(self.callback) {
self._result.addRow(row);
}
};
p.handleCommandComplete = function(msg) {
this._result.addCommandComplete(msg);
};
p.handleReadyForQuery = function() {
if(this.callback) {
this.callback(null, this._result);
}
this.emit('end', this._result);
};
p.handleError = function(err) {
//if callback supplied do not emit error event as uncaught error
//events will bubble up to node process
if(this.callback) {
this.callback(err)
} else {
this.emit('error', err);
}
this.emit('end');
};
p.submit = function(connection) {
var self = this;
@ -95,75 +145,26 @@ p.submit = function(connection) {
} else {
connection.query(this.text);
}
var converters = [];
var names = [];
var handleRowDescription = function(msg) {
buildDataRowMetadata(msg, converters, names);
};
var result = new Result();
var handleDatarow = function(msg) {
var row = {};
for(var i = 0; i < msg.fields.length; i++) {
var rawValue = msg.fields[i];
row[names[i]] = rawValue === null ? null : converters[i](rawValue);
}
self.emit('row', row);
//if there is a callback collect rows
if(self.callback) {
result.addRow(row);
}
};
var onCommandComplete = function(msg) {
result.addCommandComplete(msg);
};
var onError = function(err) {
//remove all listeners
removeListeners();
if(self.callback) {
self.callback(err);
} else {
self.emit('error', err);
}
self.emit('end');
};
var onReadyForQuery = function() {
removeListeners();
if(self.callback) {
self.callback(null, result);
}
self.emit('end', result);
};
var removeListeners = function() {
//remove all listeners
connection.removeListener('rowDescription', handleRowDescription);
connection.removeListener('dataRow', handleDatarow);
connection.removeListener('readyForQuery', onReadyForQuery);
connection.removeListener('commandComplete', onCommandComplete);
connection.removeListener('error', onError);
};
connection.on('rowDescription', handleRowDescription);
connection.on('dataRow', handleDatarow);
connection.on('readyForQuery', onReadyForQuery);
connection.on('commandComplete', onCommandComplete);
connection.on('error', onError);
};
p.hasBeenParsed = function(connection) {
return this.name && connection.parsedStatements[this.name];
};
p.getRows = function(connection) {
connection.execute({
portal: this.name,
rows: this.rows
});
connection.flush();
};
p.prepare = function(connection) {
var self = this;
//prepared statements need sync to be called after each command
//complete or when an error is encountered
this.isPreparedStatement = true;
//TODO refactor this poor encapsulation
if(!this.hasBeenParsed(connection)) {
connection.parse({
text: self.text,
@ -193,27 +194,7 @@ p.prepare = function(connection) {
name: self.name || ""
});
var getRows = function() {
connection.execute({
portal: self.name,
rows: self.rows
});
connection.flush();
};
getRows();
var onCommandComplete = function() {
connection.removeListener('error', onCommandComplete);
connection.removeListener('commandComplete', onCommandComplete);
connection.removeListener('portalSuspended', getRows);
connection.sync();
};
connection.on('portalSuspended', getRows);
connection.on('commandComplete', onCommandComplete);
connection.on('error', onCommandComplete);
this.getRows(connection);
};
var dataTypeParsers = {

View File

@ -1,3 +1,4 @@
var url = require('url');
var events = require('events');
var sys = require('sys');
@ -75,5 +76,14 @@ p._pulse = function(item, cb) {
}
module.exports = {
Pool: Pool
Pool: Pool,
parseConnectionString: function(str) {
var result = url.parse(str);
result.host = result.hostname;
result.database = result.pathname ? result.pathname.slice(1) : null
var auth = (result.auth || ':').split(':');
result.user = auth[0];
result.password = auth[1];
return result;
}
}

View File

@ -1,5 +1,5 @@
{ "name": "pg",
"version": "0.2.6",
"version": "0.2.7",
"description": "Pure JavaScript PostgreSQL client",
"homepage": "http://github.com/brianc/node-postgres",
"repository" : {

View File

@ -2,7 +2,15 @@ var helper = require(__dirname+'/test-helper');
var client = helper.client();
test("empty query message handling", function() {
client.query("");
assert.emits(client.connection, 'emptyQuery');
var query = client.query("");
assert.emits(query, 'end');
client.on('drain', client.end.bind(client));
});
test('callback supported', assert.calls(function() {
client.query("", function(err, result) {
assert.isNull(err);
assert.empty(result.rows);
})
}))

View File

@ -32,20 +32,11 @@ test("named prepared statement", function() {
name: queryName
});
test("is parsed", function() {
client.connection.on('parseComplete', function() {
parseCount++;
});
});
assert.emits(query, 'row', function(row) {
assert.equal(row.name, 'Brian');
});
assert.emits(query, 'end', function() {
test("query was parsed", function() {
assert.equal(parseCount, 1);
});
});
});
@ -61,9 +52,6 @@ test("named prepared statement", function() {
});
assert.emits(cachedQuery, 'end', function() {
test("query was only parsed one time", function() {
assert.equal(parseCount, 1, "Should not have reparsed query");
});
});
});
@ -87,7 +75,7 @@ test("named prepared statement", function() {
});
assert.emits(q, 'end', function() {
assert.equal(parseCount, 1);
});
});
});

View File

@ -110,6 +110,7 @@ test('executing query', function() {
});
test('removes itself after another readyForQuery message', function() {
return false;
assert.emits(query, "end", function(msg) {
//TODO do we want to check the complete messages?
});