air/db.js
2013-10-15 19:25:21 +09:00

365 lines
14 KiB
JavaScript

/**
* db - a module that handles talking to the database (in this case, postgres).
*
* This module attempts to treat the database as an implementation detail. Methods are made available to
* create tables, drop tables, insert rows, select rows, etc., without the caller needing to deal with SQL.
*/
"use strict";
var util = require("util");
var _ = require("underscore");
var tool = require("./tool");
var when = require("when");
var pg = require("pg");
var schema = require("./schema");
var log = tool.log();
var connectionString = process.argv[3]; // for example: "postgres://postgres:12345@localhost:5432/air"
var labels = ["year", "month", "day", "hour", "minute", "second"];
/**
* Surrounds s with double quotes.
*/
function quoteName(s) {
return '"' + s + '"';
}
/**
* Surrounds s with single quotes.
*/
function quoteValue(s) {
return "'" + s + "'";
}
/**
* Returns a sql statement that drops the specified table.
*
* @param {Object} tableSpec an object that describes the table structure. This function requires the name: key to
* hold the table's name.
* @returns {{sql: string, args: Array}} an object {sql: x, args: y} representing a drop table statement.
*/
exports.dropTable = function dropTable(tableSpec) {
return {sql: tool.format("DROP TABLE IF EXISTS {0};", quoteName(tableSpec.name)), args: []};
}
/**
* Returns a sql statement that creates the specified table if it doesn't already exist.
*
* @param {Object} tableSpec an object that describes the table structure. This function requires the following keys:
* name: the name of the table;
* columns: an array of column objects:
* name: the name of the column;
* type: the type of the column;
* modifier: optional definition flags for the column, such as "NOT NULL";
* primary: an optional object to describe the table's primary key:
* name: the primary key's name;
* columns: an array of one or more column names that contribute to the primary key;
* @returns {{sql: string, args: Array}} an object {sql: x, args: y} representing a create table statement.
*/
exports.createTable = function createTable(tableSpec) {
var stmt = tool.format("CREATE TABLE IF NOT EXISTS {0} (\n ", quoteName(tableSpec.name));
stmt += tableSpec.columns.map(function(column) {
return tool.format("{0} {1} {2}", quoteName(column.name), column.type, tool.coalesce(column.modifier, ""));
}).join(",\n ");
if (tableSpec.primary) {
stmt += tool.format(",\n CONSTRAINT {0} PRIMARY KEY ({1})",
quoteName(tableSpec.primary.name),
tableSpec.primary.columns.map(function(columnName) { return quoteName(columnName); }).join(", "));
}
stmt += "\n) WITH (OIDS = FALSE);";
if (tableSpec.owner) {
stmt += tool.format("\nALTER TABLE {0} OWNER TO {1};", quoteName(tableSpec.name), tableSpec.owner);
}
return {sql: stmt, args: []};
}
/**
* Returns a sql statement that either inserts or updates the provided row into the specified table.
*
* @param {Object} tableSpec an object that describes the table structure. This function requires the following keys:
* name: the name of the table;
* columns: an array of column objects:
* name: the name of the column;
* type: the type of the column;
* primary: an object to describe the table's primary key:
* name: the primary key's name;
* columns: an array of one or more column names that contribute to the primary key;
* @param {Object} row an object of keys and values where keys correspond to column names.
* @returns {{sql: string, args: Array}} an object {sql: x, args: y} representing an upsert statement.
*/
exports.upsert = function upsert(tableSpec, row) {
/* WITH new_values (id, field1, field2) AS (
VALUES (CAST($1 AS foo), CAST($2 AS foo), CAST($3 AS foo))),
upsert AS (
UPDATE someTable m
SET field1 = nv.field1,
field2 = nv.field2
FROM new_values nv
WHERE m.id = nv.id
RETURNING m.*
)
INSERT INTO someTable (id, field1, field2)
SELECT id, field1, field2
FROM new_values
WHERE NOT EXISTS (SELECT 1 FROM upsert up WHERE up.id = new_values.id) */
var table = quoteName(tableSpec.name);
var columns = tableSpec.columns;
var quotedNames = columns.map(function(column) { return quoteName(column.name); });
var allQuotedNames = quotedNames.join(", ");
function idEqualityExpression(l, r) {
return tableSpec.primary.columns.map(function(n) {
return tool.format("{0}.{2} = {1}.{2}", l, r, quoteName(n));
}).join(" AND ");
}
var values = [];
var stmt = tool.format("WITH new_values ({0}) AS (\n VALUES (\n {1})),\n",
allQuotedNames,
columns.map(function(column, i) {
var value = row[column.name];
if (value === undefined) {
value = null;
}
values.push(value);
return tool.format("CAST(${0} AS {1})", i + 1, column.type);
}).join(",\n "));
stmt += tool.format("upsert AS (\n UPDATE {0} m SET\n ", table);
var provided = [];
columns.forEach(function(column, i) {
// Skip assignment of columns that have no value defined for them. This will retain the column value
// of the row if it exists.
if (row[column.name] !== undefined) {
provided.push(quotedNames[i]);
}
});
stmt += tool.format("{0}\n FROM new_values nv\n WHERE {1}\n RETURNING m.*)\n",
provided.map(function(col) {return tool.format("{0} = nv.{0}", col); }).join(",\n "),
idEqualityExpression("m", "nv"));
stmt += tool.format(
"INSERT INTO {0}({1})\nSELECT {1}\nFROM new_values\nWHERE NOT EXISTS (SELECT 1 FROM upsert up WHERE {2});\n",
table,
allQuotedNames,
idEqualityExpression("up", "new_values"));
return {sql: stmt, args: values};
}
/**
* Builds a sql constraint clause for the provided date parts.
*/
function dateConstraint(date) {
var parts = date.parts;
var column = quoteName("date");
var table = quoteName(schema.samples.name);
if (date.current) {
var condition = tool.format("(SELECT MAX({0}) FROM {1})", column, table);
if (parts.length > 0) {
// "date" = ((select max(date) from samples) + INTERVAL '-1 day -3 hour') order by date
condition = tool.format("({0} + INTERVAL '{1}')",
condition,
parts.map(function(item, i) {
return item + " " + labels[i];
}).join(" "));
}
return column + " = " + condition;
}
var str = tool.toISOString({year: parts[0], month: parts[1], day: parts[2], hour: parts[3], zone: date.zone});
return tool.format(
"{0} <= {1} AND {1} < CAST({0} AS TIMESTAMP WITH TIME ZONE) + INTERVAL '1 {2}'",
quoteValue(str),
column,
labels[parts.length - 1]);
}
/**
* Builds a sql constraint clause for station id, if necessary.
*/
function stationIdConstraint(constraints) {
return constraints.stationId ? tool.format("{0} = {1}", quoteName("stationId"), constraints.stationId) : null;
}
/**
* Builds a sql constraint clause for sample type, if necessary.
*/
function sampleTypeConstraint(constraints) {
return /*constraints.sampleType ? quoteName(constraints.sampleType) + " IS NOT NULL" :*/ null;
}
/**
* Returns a sql statement that selects samples matching the specified constraints.
*
* @param {Object} tableSpec
* @param {Object} stationTableSpec
* @param {Object} constraints an object the describes the constraints for the select, having the form:
* date: {current: Boolean, parts: [year, month, day, hour], zone: string},
* sampleType: string column name for sample, or null or "all" if all requested.
* stationId: Number id of desired station, or null for all.
* @returns {{sql: string, args: Array}} an object {sql: x, args: y} representing a sample select statement.
*/
exports.selectSamples = function(tableSpec, stationTableSpec, constraints) {
log.info(constraints);
var dateColumn = quoteName("date");
var idColumn = quoteName("id");
var stationIdColumn = quoteName("stationId");
var longitudeColumn = quoteName("longitude");
var latitudeColumn = quoteName("latitude");
var table = quoteName(tableSpec.name);
var stmt = tool.format("SELECT b.{0}, b.{1}, ", longitudeColumn, latitudeColumn);
// First, decide which columns to select.
if (constraints.sampleType && constraints.sampleType != "all") {
// Select only one kind of sample, including the primary key columns. Treat date as text to preserve zone.
stmt += tool.format("CAST({0} AS TEXT), {1}, {2} ",
dateColumn,
stationIdColumn,
quoteName(constraints.sampleType));
}
else {
// Select all sample kinds, including primary key columns. Treat date as text to preserve zone.
var allColumns = tableSpec.columns.map(function(col) {
return col.name != "date" ? quoteName(col.name) : tool.format("CAST({0} AS TEXT)", dateColumn);
}).join(", ");
stmt += allColumns;
}
// Next, constrain the results by date, station id, and sample type, where necessary.
stmt += tool.format("\nFROM {0} a INNER JOIN {1} b ON a.{2} = b.{3}", table, quoteName(stationTableSpec.name), stationIdColumn, idColumn);
stmt += tool.format("\nWHERE {0}", dateConstraint(constraints.date));
var stationConstraint = stationIdConstraint(constraints);
if (stationConstraint) {
stmt += " AND " + stationConstraint;
}
var typeConstraint = sampleTypeConstraint(constraints);
if (typeConstraint) {
stmt += " AND " + typeConstraint;
}
// Finally, order by date descending, then station id.
return {sql: stmt + tool.format("\nORDER BY {0} DESC, {1};", dateColumn, stationIdColumn), args: []};
}
/**
* Returns a sql statement that selects all rows from the provided table.
*
* @param {Object} tableSpec
* @returns {{sql: string, args: Array}} an object {sql: x, args: y} representing a select * statement.
*/
exports.selectAll = function(tableSpec) {
var stmt = tool.format("SELECT * FROM {0}", quoteName(tableSpec.name));
if (tableSpec.primary) {
stmt += tool.format("\nORDER BY {0}", tableSpec.primary.columns.map(quoteName).join(", "));
}
return {sql: stmt, args: []};
}
function quoteColumn(name) {
return name === "date" ?
tool.format("CAST({0} AS TEXT)", quoteName(name)) :
quoteName(name);
}
exports.selectSamplesCompact = function(constraints, columns) {
var samples = schema.samples;
var stations = schema.stations;
var stmt = "";
stmt += tool.format("SELECT {0}\n", columns.map(quoteColumn).join(", "));
stmt += tool.format("FROM {0} s INNER JOIN {1} t ", quoteName(samples.name), quoteName(stations.name));
stmt += tool.format("ON s.{0} = t.{1}\n", quoteName("stationId"), quoteName("id"));
stmt += tool.format("WHERE {0}", dateConstraint(constraints.date));
return {sql: stmt, args: []};
}
/**
* Executes the specified statement, eventually.
*
* @param {Object} statement an object {sql: text, args: [x, y, z]}, were args are optional.
* @returns {promise} a promise for the eventual processing of the statement
*/
exports.execute = function(statement) {
var d = when.defer();
pg.connect(connectionString, function(error, client, done) {
if (error) {
return d.reject(error);
}
var sql = typeof statement === "string" ? statement : statement.sql;
var args = typeof statement === "string" ? [] : (statement.args || []);
log.info(sql + (args.length > 0 ? "; " + args : ""));
client.query(sql, args, function(error, result) {
done();
if (error) {
return d.reject(error);
}
d.resolve(result);
}).on("row", function(row) {
d.notify(row);
});
});
return d.promise;
}
/**
* Executes the specified statements, eventually.
*
* @param {Array} statements an array of statement objects {sql: text, args: [x, y, z]}, were args are optional.
* @returns {promise} a promise for the eventual processing of the statements
*/
exports.executeAll = function(statements) {
var last = statements.length - 1;
if (last < 0) {
return when.resolve([]);
}
var d = when.defer();
pg.connect(connectionString, function(error, client, done) {
if (error) {
return d.reject(error);
}
var statementPromises = statements.map(function(statement, index) {
var sd = when.defer();
var sql = typeof statement === "string" ? statement : statement.sql;
var args = typeof statement === "string" ? [] : (statement.args || []);
// log.info(/*sql + */(args.length > 0 ? "; " + args : ""));
client.query(sql, args, function(error, result) {
if (index == last || error) {
done();
}
if (error) {
return sd.reject(error);
}
sd.resolve(result);
}).on("row", function(row) {
sd.notify(row);
});
return sd.promise;
});
when.all(statementPromises).then(function(a) {
d.resolve(a);
});
});
return d.promise;
}
/**
* Closes the database component, for shutdown.
*/
exports.done = function() {
pg.end();
}