/** * db - a module that handles talking to the database (in this case, postgres). * * This module attempts to treat the database as an implementation detail. Methods are made available to * create tables, drop tables, insert rows, select rows, etc., without the caller needing to deal with SQL. */ "use strict"; var util = require("util"); var _ = require("underscore"); var tool = require("./tool"); var when = require("when"); var pg = require("pg"); var schema = require("./schema"); var log = tool.log(); var connectionString = process.argv[3]; // for example: "postgres://postgres:12345@localhost:5432/air" var labels = ["year", "month", "day", "hour", "minute", "second"]; /** * Surrounds s with double quotes. */ function quoteName(s) { return '"' + s + '"'; } /** * Surrounds s with single quotes. */ function quoteValue(s) { return "'" + s + "'"; } /** * Returns a sql statement that drops the specified table. * * @param {Object} tableSpec an object that describes the table structure. This function requires the name: key to * hold the table's name. * @returns {{sql: string, args: Array}} an object {sql: x, args: y} representing a drop table statement. */ exports.dropTable = function dropTable(tableSpec) { return {sql: tool.format("DROP TABLE IF EXISTS {0};", quoteName(tableSpec.name)), args: []}; } /** * Returns a sql statement that creates the specified table if it doesn't already exist. * * @param {Object} tableSpec an object that describes the table structure. This function requires the following keys: * name: the name of the table; * columns: an array of column objects: * name: the name of the column; * type: the type of the column; * modifier: optional definition flags for the column, such as "NOT NULL"; * primary: an optional object to describe the table's primary key: * name: the primary key's name; * columns: an array of one or more column names that contribute to the primary key; * @returns {{sql: string, args: Array}} an object {sql: x, args: y} representing a create table statement. */ exports.createTable = function createTable(tableSpec) { var stmt = tool.format("CREATE TABLE IF NOT EXISTS {0} (\n ", quoteName(tableSpec.name)); stmt += tableSpec.columns.map(function(column) { return tool.format("{0} {1} {2}", quoteName(column.name), column.type, tool.coalesce(column.modifier, "")); }).join(",\n "); if (tableSpec.primary) { stmt += tool.format(",\n CONSTRAINT {0} PRIMARY KEY ({1})", quoteName(tableSpec.primary.name), tableSpec.primary.columns.map(function(columnName) { return quoteName(columnName); }).join(", ")); } stmt += "\n) WITH (OIDS = FALSE);"; if (tableSpec.owner) { stmt += tool.format("\nALTER TABLE {0} OWNER TO {1};", quoteName(tableSpec.name), tableSpec.owner); } return {sql: stmt, args: []}; } /** * Returns a sql statement that either inserts or updates the provided row into the specified table. * * @param {Object} tableSpec an object that describes the table structure. This function requires the following keys: * name: the name of the table; * columns: an array of column objects: * name: the name of the column; * type: the type of the column; * primary: an object to describe the table's primary key: * name: the primary key's name; * columns: an array of one or more column names that contribute to the primary key; * @param {Object} row an object of keys and values where keys correspond to column names. * @returns {{sql: string, args: Array}} an object {sql: x, args: y} representing an upsert statement. */ exports.upsert = function upsert(tableSpec, row) { /* WITH new_values (id, field1, field2) AS ( VALUES (CAST($1 AS foo), CAST($2 AS foo), CAST($3 AS foo))), upsert AS ( UPDATE someTable m SET field1 = nv.field1, field2 = nv.field2 FROM new_values nv WHERE m.id = nv.id RETURNING m.* ) INSERT INTO someTable (id, field1, field2) SELECT id, field1, field2 FROM new_values WHERE NOT EXISTS (SELECT 1 FROM upsert up WHERE up.id = new_values.id) */ var table = quoteName(tableSpec.name); var columns = tableSpec.columns; var quotedNames = columns.map(function(column) { return quoteName(column.name); }); var allQuotedNames = quotedNames.join(", "); function idEqualityExpression(l, r) { return tableSpec.primary.columns.map(function(n) { return tool.format("{0}.{2} = {1}.{2}", l, r, quoteName(n)); }).join(" AND "); } var values = []; var stmt = tool.format("WITH new_values ({0}) AS (\n VALUES (\n {1})),\n", allQuotedNames, columns.map(function(column, i) { var value = row[column.name]; if (value === undefined) { value = null; } values.push(value); return tool.format("CAST(${0} AS {1})", i + 1, column.type); }).join(",\n ")); stmt += tool.format("upsert AS (\n UPDATE {0} m SET\n ", table); var provided = []; columns.forEach(function(column, i) { // Skip assignment of columns that have no value defined for them. This will retain the column value // of the row if it exists. if (row[column.name] !== undefined) { provided.push(quotedNames[i]); } }); stmt += tool.format("{0}\n FROM new_values nv\n WHERE {1}\n RETURNING m.*)\n", provided.map(function(col) {return tool.format("{0} = nv.{0}", col); }).join(",\n "), idEqualityExpression("m", "nv")); stmt += tool.format( "INSERT INTO {0}({1})\nSELECT {1}\nFROM new_values\nWHERE NOT EXISTS (SELECT 1 FROM upsert up WHERE {2});\n", table, allQuotedNames, idEqualityExpression("up", "new_values")); return {sql: stmt, args: values}; } /** * Builds a sql constraint clause for the provided date parts. */ function dateConstraint(date) { var parts = date.parts; var column = quoteName("date"); var table = quoteName(schema.samples.name); if (date.current) { var condition = tool.format("(SELECT MAX({0}) FROM {1})", column, table); if (parts.length > 0) { // "date" = ((select max(date) from samples) + INTERVAL '-1 day -3 hour') order by date condition = tool.format("({0} + INTERVAL '{1}')", condition, parts.map(function(item, i) { return item + " " + labels[i]; }).join(" ")); } return column + " = " + condition; } var str = tool.toISOString({year: parts[0], month: parts[1], day: parts[2], hour: parts[3], zone: date.zone}); return tool.format( "{0} <= {1} AND {1} < CAST({0} AS TIMESTAMP WITH TIME ZONE) + INTERVAL '1 {2}'", quoteValue(str), column, labels[parts.length - 1]); } /** * Builds a sql constraint clause for station id, if necessary. */ function stationIdConstraint(constraints) { return constraints.stationId ? tool.format("{0} = {1}", quoteName("stationId"), constraints.stationId) : null; } /** * Builds a sql constraint clause for sample type, if necessary. */ function sampleTypeConstraint(constraints) { return /*constraints.sampleType ? quoteName(constraints.sampleType) + " IS NOT NULL" :*/ null; } /** * Returns a sql statement that selects samples matching the specified constraints. * * @param {Object} tableSpec * @param {Object} stationTableSpec * @param {Object} constraints an object the describes the constraints for the select, having the form: * date: {current: Boolean, parts: [year, month, day, hour], zone: string}, * sampleType: string column name for sample, or null or "all" if all requested. * stationId: Number id of desired station, or null for all. * @returns {{sql: string, args: Array}} an object {sql: x, args: y} representing a sample select statement. */ exports.selectSamples = function(tableSpec, stationTableSpec, constraints) { log.info(constraints); var dateColumn = quoteName("date"); var idColumn = quoteName("id"); var stationIdColumn = quoteName("stationId"); var longitudeColumn = quoteName("longitude"); var latitudeColumn = quoteName("latitude"); var table = quoteName(tableSpec.name); var stmt = tool.format("SELECT b.{0}, b.{1}, ", longitudeColumn, latitudeColumn); // First, decide which columns to select. if (constraints.sampleType && constraints.sampleType != "all") { // Select only one kind of sample, including the primary key columns. Treat date as text to preserve zone. stmt += tool.format("CAST({0} AS TEXT), {1}, {2} ", dateColumn, stationIdColumn, quoteName(constraints.sampleType)); } else { // Select all sample kinds, including primary key columns. Treat date as text to preserve zone. var allColumns = tableSpec.columns.map(function(col) { return col.name != "date" ? quoteName(col.name) : tool.format("CAST({0} AS TEXT)", dateColumn); }).join(", "); stmt += allColumns; } // Next, constrain the results by date, station id, and sample type, where necessary. stmt += tool.format("\nFROM {0} a INNER JOIN {1} b ON a.{2} = b.{3}", table, quoteName(stationTableSpec.name), stationIdColumn, idColumn); stmt += tool.format("\nWHERE {0}", dateConstraint(constraints.date)); var stationConstraint = stationIdConstraint(constraints); if (stationConstraint) { stmt += " AND " + stationConstraint; } var typeConstraint = sampleTypeConstraint(constraints); if (typeConstraint) { stmt += " AND " + typeConstraint; } // Finally, order by date descending, then station id. return {sql: stmt + tool.format("\nORDER BY {0} DESC, {1};", dateColumn, stationIdColumn), args: []}; } /** * Returns a sql statement that selects all rows from the provided table. * * @param {Object} tableSpec * @returns {{sql: string, args: Array}} an object {sql: x, args: y} representing a select * statement. */ exports.selectAll = function(tableSpec) { var stmt = tool.format("SELECT * FROM {0}", quoteName(tableSpec.name)); if (tableSpec.primary) { stmt += tool.format("\nORDER BY {0}", tableSpec.primary.columns.map(quoteName).join(", ")); } return {sql: stmt, args: []}; } function quoteColumn(name) { return name === "date" ? tool.format("CAST({0} AS TEXT)", quoteName(name)) : quoteName(name); } exports.selectSamplesCompact = function(constraints, columns) { var samples = schema.samples; var stations = schema.stations; var stmt = ""; stmt += tool.format("SELECT {0}\n", columns.map(quoteColumn).join(", ")); stmt += tool.format("FROM {0} s INNER JOIN {1} t ", quoteName(samples.name), quoteName(stations.name)); stmt += tool.format("ON s.{0} = t.{1}\n", quoteName("stationId"), quoteName("id")); stmt += tool.format("WHERE {0}", dateConstraint(constraints.date)); return {sql: stmt, args: []}; } /** * Executes the specified statement, eventually. * * @param {Object} statement an object {sql: text, args: [x, y, z]}, were args are optional. * @returns {promise} a promise for the eventual processing of the statement */ exports.execute = function(statement) { var d = when.defer(); pg.connect(connectionString, function(error, client, done) { if (error) { return d.reject(error); } var sql = typeof statement === "string" ? statement : statement.sql; var args = typeof statement === "string" ? [] : (statement.args || []); log.info(sql + (args.length > 0 ? "; " + args : "")); client.query(sql, args, function(error, result) { done(); if (error) { return d.reject(error); } d.resolve(result); }).on("row", function(row) { d.notify(row); }); }); return d.promise; } /** * Executes the specified statements, eventually. * * @param {Array} statements an array of statement objects {sql: text, args: [x, y, z]}, were args are optional. * @returns {promise} a promise for the eventual processing of the statements */ exports.executeAll = function(statements) { var last = statements.length - 1; if (last < 0) { return when.resolve([]); } var d = when.defer(); pg.connect(connectionString, function(error, client, done) { if (error) { return d.reject(error); } var statementPromises = statements.map(function(statement, index) { var sd = when.defer(); var sql = typeof statement === "string" ? statement : statement.sql; var args = typeof statement === "string" ? [] : (statement.args || []); // log.info(/*sql + */(args.length > 0 ? "; " + args : "")); client.query(sql, args, function(error, result) { if (index == last || error) { done(); } if (error) { return sd.reject(error); } sd.resolve(result); }).on("row", function(row) { sd.notify(row); }); return sd.promise; }); when.all(statementPromises).then(function(a) { d.resolve(a); }); }); return d.promise; } /** * Closes the database component, for shutdown. */ exports.done = function() { pg.end(); }