mirror of
https://github.com/cambecc/air.git
synced 2025-12-08 21:26:22 +00:00
291 lines
9.9 KiB
JavaScript
291 lines
9.9 KiB
JavaScript
/**
|
||
* server - the main "air" server application.
|
||
*
|
||
* This module starts the database, scraper, and api components. It also processes the scraped data and
|
||
* sets up periodic scrapes. Could probably be much cleaner...
|
||
*/
|
||
|
||
"use strict";
|
||
|
||
console.log("============================================================");
|
||
console.log(new Date().toISOString() + " - Starting");
|
||
|
||
var util = require("util");
|
||
var _ = require("underscore");
|
||
var when = require("when");
|
||
var db = require("./db");
|
||
var scraper = require("./scraper");
|
||
var tool = require("./tool");
|
||
var schema = require("./schema");
|
||
var api = require("./api");
|
||
var stationsData = require("./station-data");
|
||
|
||
var log = tool.log();
|
||
var iconvShiftJIStoUTF8 = new (require("iconv")).Iconv("SHIFT_JIS", "UTF-8//TRANSLIT//IGNORE");
|
||
var shiftJIStoUTF8 = iconvShiftJIStoUTF8.convert.bind(iconvShiftJIStoUTF8);
|
||
|
||
var scrapeURL = process.argv[4];
|
||
var stationNames = {};
|
||
|
||
function extractP160DateTime(dom) {
|
||
var parts = scraper.matchText(/−(.*)年(.*)月(.*)日(.*)時.*−/, dom)[0];
|
||
return tool.toISOString({year: parts[1], month: parts[2], day: parts[3], hour: parts[4], zone: "+09:00"});
|
||
}
|
||
|
||
function cardinalToDegrees(s) {
|
||
switch (s) {
|
||
case "N": return 0;
|
||
case "NNE": return 22.5;
|
||
case "NE": return 45;
|
||
case "ENE": return 67.5;
|
||
case "E": return 90;
|
||
case "ESE": return 112.5;
|
||
case "SE": return 135;
|
||
case "SSE": return 157.5;
|
||
case "S": return 180;
|
||
case "SSW": return 202.5;
|
||
case "SW": return 225;
|
||
case "WSW": return 247.5;
|
||
case "W": return 270;
|
||
case "WNW": return 292.5;
|
||
case "NW": return 315;
|
||
case "NNW": return 337.5;
|
||
case "C": return 360; // calm; map to 360 to distinguish from 0 (N) and null (no sample)
|
||
default: return s;
|
||
}
|
||
}
|
||
|
||
function addTag(target, tag, value) {
|
||
value = (value || "").trim();
|
||
if (value.length > 0 && value !== "-" && value !== " ") {
|
||
var scale = 1;
|
||
switch (tag) {
|
||
case "temp": scale = 0.1; break; // 0.1 deg C -> deg C
|
||
case "hum": scale = 0.1; break; // 0.1% -> 1%
|
||
case "wv":
|
||
value = (value == "C" ? 0 : value);
|
||
scale = 0.1; // 0.1 m/s -> 1 m/s
|
||
break;
|
||
case "wd":
|
||
value = cardinalToDegrees(value);
|
||
break;
|
||
case "in": scale = 0.01; break; // 0.01 MJ/m2 -> MJ/m2
|
||
case "no": scale = 0.001; break; // mm3/m3 -> cm3/m3
|
||
case "no2": scale = 0.001; break; // mm3/m3 -> cm3/m3
|
||
case "nox": scale = 0.001; break; // mm3/m3 -> cm3/m3
|
||
case "ox": scale = 0.001; break; // mm3/m3 -> cm3/m3
|
||
case "so2": scale = 0.001; break; // mm3/m3 -> cm3/m3
|
||
case "co": scale = 0.1; break; // 0.1 cm3/m3 -> cm3/m3
|
||
case "ch4": scale = 0.01; break; // 10 mm3/m3 -> cm3/m3
|
||
case "nmhc": scale = 0.01; break; // 10 mm3/m3 -> cm3/m3
|
||
case "spm": break; // μg/m3
|
||
case "pm25": break; // μg/m3
|
||
}
|
||
target[tag] = value * scale;
|
||
}
|
||
return target;
|
||
}
|
||
|
||
function validateP160Header(header) {
|
||
var expected =
|
||
"番号,局番,局名,SO2,ppb,Ox,ppb,NO,ppb,NO2,ppb,NOx,ppb,CO,0.1ppm,SPM,μg/m3,NMHC," +
|
||
"pphmC,CH4,pphmC,PM2.5,μg/m3,風向,風速,0.1m/s,気温,0.1度,湿度,0.1%,日射量,0.01MJ/m2";
|
||
var actual = "" + header;
|
||
if (actual !== expected) {
|
||
throw "Expected P160 header: \n " + expected + "\nbut found:\n " + actual;
|
||
}
|
||
}
|
||
|
||
function processP160Row(row, date, counts) {
|
||
/* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
||
番号 局番 局名 SO2 Ox NO NO2 NOx CO SPM NMHC CH4 PM2.5 風向 風速 気温 湿度 日射量 */
|
||
var stationId = row[1] * 1;
|
||
var stationName = row[2];
|
||
|
||
stationNames[stationName] = stationId;
|
||
|
||
var item = { stationId: stationId, stationName: stationName, date: date };
|
||
addTag(item, "so2", row[3]);
|
||
addTag(item, "ox", row[4]);
|
||
addTag(item, "no", row[5]);
|
||
addTag(item, "no2", row[6]);
|
||
addTag(item, "nox", row[7]);
|
||
addTag(item, "co", row[8]);
|
||
addTag(item, "spm", row[9]);
|
||
addTag(item, "nmhc", row[10]);
|
||
addTag(item, "ch4", row[11]);
|
||
addTag(item, "pm25", row[12]);
|
||
addTag(item, "wd", row[13]);
|
||
addTag(item, "wv", row[14]);
|
||
addTag(item, "temp", row[15]);
|
||
addTag(item, "hum", row[16]);
|
||
addTag(item, "in", row[17]);
|
||
|
||
if (item.hasOwnProperty("wd") && item.hasOwnProperty("wv")) {
|
||
// Increment each time we find a row with wind data.
|
||
counts.windData += 1;
|
||
}
|
||
|
||
return db.upsert(schema.samples, item);
|
||
}
|
||
|
||
function processP160(dom) {
|
||
log.info("Processing P160...");
|
||
|
||
var tables = scraper.tablesOf(dom);
|
||
if (tables.length < 4) {
|
||
log.error("no data found");
|
||
return null;
|
||
}
|
||
|
||
var header = scraper.extract(tables[2])[0]; // table at index two is the header
|
||
validateP160Header(header);
|
||
|
||
var date = extractP160DateTime(dom);
|
||
var rows = scraper.extract(tables[3]); // table at index three is the data
|
||
var counts = {windData: 0};
|
||
var statements = rows.map(function(row) { return processP160Row(row, date, counts); });
|
||
if (counts.windData < 5) {
|
||
log.error("insufficient wind data found:" + counts); // the table we're scraping is probably incomplete
|
||
return null;
|
||
}
|
||
return statements;
|
||
}
|
||
|
||
function start() {
|
||
log.info("Preparing tables...");
|
||
return persist([db.createTable(schema.stations), db.createTable(schema.samples)]);
|
||
}
|
||
|
||
function scrapeP160(page, date) {
|
||
date = date ? Math.floor(date.getTime() / 1000) : "";
|
||
var url = tool.format("{0}/p160.cgi?no2=={1}={2}==2====2=", scrapeURL, date, page);
|
||
return scraper.fetch(url, shiftJIStoUTF8);
|
||
}
|
||
|
||
function persist(statements) {
|
||
if (!statements) {
|
||
return when.resolve(null);
|
||
}
|
||
log.info("Persisting...");
|
||
return db.executeAll(statements);
|
||
}
|
||
|
||
function doP160Page(page, date) {
|
||
return scrapeP160(page, date)
|
||
.then(processP160)
|
||
.then(persist);
|
||
}
|
||
|
||
function doP160(date) {
|
||
// return a promise for a boolean which is false if data was processed, and true if data was not available
|
||
// i.e., true == we are done.
|
||
// CONSIDER: This function's behavior is subtle and confusing. Improve.
|
||
var promises = [doP160Page(1, date), doP160Page(2, date)];
|
||
return when.reduce(
|
||
promises,
|
||
function(current, value) {
|
||
return current && !value;
|
||
},
|
||
true);
|
||
}
|
||
|
||
function pollP160ForUpdates() {
|
||
// Return a promise for a boolean which is true if new data was found. New data is found if the database
|
||
// reports that rows have been inserted or updated after scraping both pages.
|
||
// CONSIDER: This function's behavior is subtle and confusing. Improve.
|
||
var promises = [doP160Page(1), doP160Page(2)];
|
||
|
||
function sumRowCounts(current, value) {
|
||
if (value) {
|
||
value.forEach(function(result) {
|
||
current += result.rowCount; // abstraction leakage -- relying on rowCount to exist
|
||
});
|
||
}
|
||
return current;
|
||
}
|
||
|
||
return when.reduce(promises, sumRowCounts, 0).then(
|
||
function(rowsInserted) {
|
||
log.info("results of poll: rowsInserted = " + rowsInserted);
|
||
// Expect at least 60 samples, otherwise scrape not successful. Ugh.
|
||
var foundNewData = rowsInserted >= 60;
|
||
if (foundNewData) {
|
||
log.info("resetting query memos");
|
||
api.resetQueryMemos();
|
||
}
|
||
return foundNewData;
|
||
});
|
||
}
|
||
|
||
function doStationDetails() {
|
||
log.info("Preparing station details...");
|
||
var statements = [];
|
||
_.keys(stationNames).forEach(function(name) {
|
||
statements.push(db.upsert(schema.stations, {id: stationNames[name], name: name}));
|
||
});
|
||
stationsData.forEach(function(station) {
|
||
var row = {
|
||
id: station[0],
|
||
name: station[1],
|
||
address: station[2],
|
||
latitude: station[3],
|
||
longitude: station[4]
|
||
};
|
||
statements.push(db.upsert(schema.stations, row));
|
||
});
|
||
return persist(statements);
|
||
}
|
||
|
||
function doP160Historical(hours) {
|
||
log.info("Starting P160 Historical...");
|
||
var now = new Date().getTime();
|
||
var dates = [];
|
||
for (var i = 1; i <= hours; i++) {
|
||
dates.push(new Date(now - (i * 60 * 60 * 1000)));
|
||
}
|
||
|
||
function wait(x) {
|
||
var d = when.defer();
|
||
setTimeout(function() { d.resolve(x); }, 3000);
|
||
return d.promise;
|
||
}
|
||
|
||
return function doAnotherDate(done) {
|
||
if (dates.length > 0 && !done) {
|
||
var date = dates.shift();
|
||
log.info(tool.format("Processing {0}... (remaining: {1})", date, dates.length));
|
||
return doP160(date).then(wait).then(doAnotherDate);
|
||
}
|
||
else {
|
||
log.info("Finished P160 Historical");
|
||
}
|
||
}(false);
|
||
}
|
||
|
||
/**
|
||
* Look for new air data every hour.
|
||
*/
|
||
function pollForUpdates() {
|
||
var ONE_SECOND = 1000;
|
||
var ONE_MINUTE = 60 * ONE_SECOND;
|
||
var ONE_HOUR = 60 * ONE_MINUTE;
|
||
|
||
// Wait an exponentially longer amount of time after each retry, up to 8 min.
|
||
function exponentialBackoff(t) {
|
||
return Math.min(Math.pow(2, t < 0 ? -(t + 1) : t), 8) * ONE_MINUTE;
|
||
}
|
||
|
||
// The air data is updated every hour, but we don't know exactly when. By specifying initialRetry = -1,
|
||
// the pages get scraped a little earlier than the estimated time. Eventually, the algorithm will center
|
||
// itself on the actual time, even if it varies a bit.
|
||
tool.setFlexInterval(pollP160ForUpdates, ONE_MINUTE, ONE_HOUR, exponentialBackoff, -1);
|
||
}
|
||
|
||
start()
|
||
.then(doP160.bind(undefined, null))
|
||
.then(doStationDetails)
|
||
.then(pollForUpdates)
|
||
.then(doP160Historical.bind(undefined, 0/*9 * 24*/)) // up to nine days of historical data available
|
||
.then(null, function(e) { log.error(e.stack); });
|