switch to a proper logging framework, and introduce smarter scraping logic that attempts to minimize the delay between data becoming available and a scrape occurring.

This commit is contained in:
Cameron Beccario 2013-09-20 23:16:21 +09:00
parent e065077759
commit 1b3f18cabd
6 changed files with 144 additions and 32 deletions

6
api.js
View File

@ -7,6 +7,7 @@ var express = require("express");
var when = require("when");
var db = require("./db");
var tool = require("./tool");
var log = tool.log();
var port = process.argv[2];
@ -178,8 +179,7 @@ app.use(logger("[:date] :remote-addr :method :url HTTP/:http-version:headers"));
//});
function handleUnexpected(res, error) {
console.error(error);
console.error(error.stack);
log.error(error.stack);
res.send(500);
}
@ -366,4 +366,4 @@ app.get("/map/:year/:month/:day/:hour", function(req, res) {
app.use(express.static(__dirname + "/public"));
app.listen(port);
console.log(tool.format("Listening on port {0}...", port));
log.info(tool.format("Listening on port {0}...", port));

15
db.js
View File

@ -6,6 +6,7 @@ var tool = require("./tool");
var when = require("when");
var pg = require("pg");
var schema = require("./schema");
var log = tool.log();
var connectionString = process.argv[3]; // for example: "postgres://postgres:12345@localhost:5432/air"
var labels = ["year", "month", "day", "hour", "minute", "second"];
@ -194,7 +195,7 @@ function sampleTypeConstraint(constraints) {
* @returns {{sql: string, args: Array}} an object {sql: x, args: y} representing a sample select statement.
*/
exports.selectSamples = function(tableSpec, stationTableSpec, constraints) {
console.log(constraints);
log.info(constraints);
var dateColumn = quoteName("date");
var idColumn = quoteName("id");
var stationIdColumn = quoteName("stationId");
@ -285,7 +286,7 @@ exports.execute = function(statement) {
var sql = typeof statement === "string" ? statement : statement.sql;
var args = typeof statement === "string" ? [] : (statement.args || []);
console.log(sql + (args.length > 0 ? "; " + args : ""));
log.info(sql + (args.length > 0 ? "; " + args : ""));
client.query(sql, args, function(error, result) {
done();
@ -319,12 +320,12 @@ exports.executeAll = function(statements) {
return d.reject(error);
}
d.resolve(statements.map(function(statement, index) {
var statementPromises = statements.map(function(statement, index) {
var sd = when.defer();
var sql = typeof statement === "string" ? statement : statement.sql;
var args = typeof statement === "string" ? [] : (statement.args || []);
// console.log(/*sql + */(args.length > 0 ? "; " + args : ""));
// log.info(/*sql + */(args.length > 0 ? "; " + args : ""));
client.query(sql, args, function(error, result) {
if (index == last || error) {
@ -339,7 +340,11 @@ exports.executeAll = function(statements) {
});
return sd.promise;
}));
});
when.all(statementPromises).then(function(a) {
d.resolve(a);
});
});
return d.promise;
}

View File

@ -13,17 +13,15 @@
},
"engine": "node >= 0.10.12",
"dependencies": {
"winston": "0.7.2",
"underscore": "1.5.1",
"when": "2.2.1",
"express": "3.3.4",
"htmlparser": "1.7.6",
"iconv": "2.0.6",
"pg": "2.1.0",
"underscore": "1.5.1",
"when": "2.2.1"
"pg": "2.1.0"
},
"devDependencies": {
"nodeunit": "0.8.1"
},
"scripts": {
"start": "node server.js"
}
}

View File

@ -3,6 +3,8 @@
var when = require("when");
var http = require("http");
var htmlparser = require("htmlparser");
var tool = require("./tool");
var log = tool.log();
/**
* Converts the provided HTML text into a dom.
@ -88,17 +90,17 @@ exports.matchText = function(regex, dom) {
exports.fetch = function(options, converter) {
converter = converter || function nop(buffer) { return buffer; };
var d = when.defer();
console.log("get: " + options);
log.info("get: " + options);
http.get(options, function(response) {
var chunks = [];
response.on("data", function(chunk) {
chunks.push(chunk);
});
response.on("end", function() {
console.log("got: " + options);
log.info("got: " + options);
var converted = converter(Buffer.concat(chunks));
var parsed = parseHTML(converted);
console.log("done: " + options);
log.info("done: " + options);
d.resolve(parsed);
});
}).on("error", function(error) {

View File

@ -1,7 +1,7 @@
"use strict";
console.log("============================================================");
console.log("Starting " + new Date() + "...");
console.log(new Date().toISOString() + " - Starting");
var util = require("util");
var _ = require("underscore");
@ -13,6 +13,7 @@ var schema = require("./schema");
var api = require("./api");
var stationsData = require("./station-data");
var log = tool.log();
var iconvShiftJIStoUTF8 = new (require("iconv")).Iconv("SHIFT_JIS", "UTF-8//TRANSLIT//IGNORE");
var shiftJIStoUTF8 = iconvShiftJIStoUTF8.convert.bind(iconvShiftJIStoUTF8);
@ -117,11 +118,11 @@ function processP160Row(row, date) {
}
function processP160(dom) {
console.log("Processing P160...");
log.info("Processing P160...");
var tables = scraper.tablesOf(dom);
if (tables.length < 4) {
console.log("no data found");
log.error("no data found");
return null;
}
@ -134,7 +135,7 @@ function processP160(dom) {
}
function start() {
console.log("Preparing tables...");
log.info("Preparing tables...");
return persist([db.createTable(schema.stations), db.createTable(schema.samples)]);
}
@ -148,7 +149,7 @@ function persist(statements) {
if (!statements) {
return when.resolve(null);
}
console.log("Persisting...");
log.info("Persisting...");
return db.executeAll(statements);
}
@ -161,16 +162,39 @@ function doP160Page(page, date) {
function doP160(date) {
// return a promise for a boolean which is false if data was processed, and true if data was not available
// i.e., true == we are done.
var promises = [doP160Page(1, date), doP160Page(2, date)];
return when.reduce(
[doP160Page(1, date), doP160Page(2, date)],
promises,
function(current, value) {
return current && !value;
},
true);
}
function pollP160ForUpdates() {
// Return a promise for a boolean which is true if new data was found. New data is found if the database
// reports that rows have been inserted or updated after scraping both pages.
var promises = [doP160Page(1), doP160Page(2)];
return when.reduce(
promises,
function(current, value) {
var result = value && value[0] || {};
return current + result.rowCount;
},
0)
.then(function(rowCount) {
log.info("results of poll: rowCount = " + rowCount);
var success = rowCount >= 2; // ugh.
if (success) {
log.info("resetting query memos");
api.resetQueryMemos();
}
return success;
});
}
function doStationDetails() {
console.log("Preparing station details...");
log.info("Preparing station details...");
var statements = [];
_.keys(stationNames).forEach(function(name) {
statements.push(db.upsert(schema.stations, {id: stationNames[name], name: name}));
@ -189,7 +213,7 @@ function doStationDetails() {
}
function doP160Historical(hours) {
console.log("Starting P160 Historical...");
log.info("Starting P160 Historical...");
var now = new Date().getTime();
var dates = [];
for (var i = 1; i <= hours; i++) {
@ -205,21 +229,33 @@ function doP160Historical(hours) {
return function doAnotherDate(done) {
if (dates.length > 0 && !done) {
var date = dates.shift();
console.log(tool.format("Processing {0}... (remaining: {1})", date, dates.length));
log.info(tool.format("Processing {0}... (remaining: {1})", date, dates.length));
return doP160(date).then(wait).then(doAnotherDate);
}
else {
console.log("Finished P160 Historical");
log.info("Finished P160 Historical");
}
}(false);
}
function pollForUpdates() {
var ONE_MINUTE = 60 * 1000;
var ONE_HOUR = 60 * ONE_MINUTE;
// Wait an exponentially longer amount of time after each retry, up to 15 min.
function exponentialBackoff(t) {
return Math.min(Math.pow(2, t < 0 ? -(t + 1) : t), 15) * ONE_MINUTE;
}
// The air data is updated every hour, but we don't know exactly when. By specifying initialRetry = -2,
// the pages get scraped a little earlier than the estimated time. Eventually, the algorithm will center
// itself on the actual time, even if it varies a bit.
tool.setFlexInterval(pollP160ForUpdates, ONE_MINUTE, ONE_HOUR, exponentialBackoff, -2);
}
start()
.then(doP160.bind(undefined, null))
.then(doStationDetails)
.then(pollForUpdates)
.then(doP160Historical.bind(undefined, 0/*9 * 24*/)) // up to nine days of historical data available
.then(null, console.error);
setInterval(function update() {
doP160().then(api.resetQueryMemos).then(null, console.error);
}, 10 * 60 * 1000); // update every ten minutes
.then(null, function(e) { log.error(e.stack); });

71
tool.js
View File

@ -1,6 +1,21 @@
"use strict";
var _ = require("underscore");
var when = require("when");
var winston = require("winston");
/**
* Returns a new, nicely configured winston logger.
*
* @returns {winston.Logger}
*/
exports.log = function() {
return new (winston.Logger)({
transports: [
new (winston.transports.Console)({level: 'debug', timestamp: true, colorize: false})
]
});
}; var log = exports.log();
/**
* Returns the string representation of a number padded with leading characters to make
@ -110,3 +125,59 @@ exports.withZone = function(isoString, zone) {
return dateToISO(date, zone);
}
/**
* Repeatedly calls a function with the specified period, waiting 'initialDelay' milliseconds before the first
* invocation. If the function returns true, then the invocation for the next period is scheduled. If the function
* returns false, then function invocation is retried on a schedule determined by the backoff function.
*
* The backoff function is a function(i) that is invoked when the ith retry has failed, and returns the number
* of milliseconds to wait before attempting the next retry. For example, given a time t that corresponds to a
* period Px in which to invoke the function, i is initially 0. If the invocation at t fails, invocation is
* scheduled for backoff(0) milliseconds later, and i is incremented. If that invocation yet again fails, the next
* invocation is scheduled for backoff(1) milliseconds later, and i again increments. This continues until the
* function succeeds. Upon success, i becomes 0, and the next invocation is scheduled for the period Px+1.
*
* It is sometimes useful to initialize i to a negative value for each period Px. This has the effect of "retrying"
* the function _before_ the desired time, allowing the schedule to adapt to noisy environments where the period of
* some recurring event exhibits variation. For example, given an estimated time t when function invocation is
* expected to be successful, initializing i to -1 means function invocation first occurs at t - backoff(-1).
* Assuming this fails, the next attempt occurs at t (backoff(-1) milliseconds later), then again backoff(0) ms
* later, and so on as discussed earlier.
*
* This function returns a function that, when invoked, cancels all future invocations.
*
* @param funcToCall the function to invoke repeatedly.
* @param initialDelay the milliseconds to wait before the first invocation.
* @param period the desired milliseconds between subsequent invocations.
* @param backoff a function(i) that returns the number of milliseconds to wait after the ith retry fails.
* @param [initialRetry] the initial ith retry value for each period; defaults to 0.
* @returns {Function} when the returned function is invoked, all future invocations are canceled.
*/
exports.setFlexInterval = function(funcToCall, initialDelay, period, backoff, initialRetry) {
var done = false;
initialRetry = initialRetry || 0;
var i = initialRetry;
var start = 0;
for (var t = initialRetry; t < 0; t++) {
start += backoff(t);
}
function schedule(success) {
if (success) {
i = initialRetry;
}
var next = Math.max(0, success ? period - start : backoff(i++));
log.info("scheduling for: " + next);
setTimeout(invoke, next);
}
function invoke() {
if (!done) {
when(funcToCall()).then(schedule, function(e) { log.error(e.stack); });
}
}
setTimeout(invoke, initialDelay);
return function() { done = true; };
}