mirror of
https://github.com/cambecc/air.git
synced 2025-12-08 21:26:22 +00:00
restructure the way when.js is used
This commit is contained in:
parent
85c2257301
commit
4ddda4aa16
7
db.js
7
db.js
@ -178,8 +178,7 @@ function stationIdConstraint(constraints) {
|
||||
* Builds a sql constraint clause for sample type, if necessary.
|
||||
*/
|
||||
function sampleTypeConstraint(constraints) {
|
||||
return null;
|
||||
// return constraints.sampleType ? quoteName(constraints.sampleType) + ' IS NOT NULL' : null;
|
||||
return /*constraints.sampleType ? quoteName(constraints.sampleType) + ' IS NOT NULL' :*/ null;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -247,7 +246,7 @@ exports.execute = function(statement) {
|
||||
var sql = typeof statement === 'string' ? statement : statement.sql;
|
||||
var args = typeof statement === 'string' ? [] : (statement.args || []);
|
||||
|
||||
console.log(sql + (args.length > 0 ? '; ' + args : ''));
|
||||
// console.log(sql + (args.length > 0 ? '; ' + args : ''));
|
||||
|
||||
client.query(sql, args, function(error, result) {
|
||||
done();
|
||||
@ -286,7 +285,7 @@ exports.executeAll = function(statements) {
|
||||
var sql = typeof statement === 'string' ? statement : statement.sql;
|
||||
var args = typeof statement === 'string' ? [] : (statement.args || []);
|
||||
|
||||
console.log(/*sql + */(args.length > 0 ? '; ' + args : ''));
|
||||
// console.log(/*sql + */(args.length > 0 ? '; ' + args : ''));
|
||||
|
||||
client.query(sql, args, function(error, result) {
|
||||
if (index == last || error) {
|
||||
|
||||
67
server.js
67
server.js
@ -3,7 +3,6 @@
|
||||
var util = require('util');
|
||||
var _ = require('underscore');
|
||||
var when = require('when');
|
||||
var pipeline = require('when/pipeline');
|
||||
var db = require('./db');
|
||||
var scraper = require('./scraper');
|
||||
var tool = require('./tool');
|
||||
@ -147,7 +146,7 @@ function processP160Row(row, date) {
|
||||
}
|
||||
|
||||
function processP160(dom) {
|
||||
console.log('[ZZZ] Processing P160...');
|
||||
console.log('Processing P160...');
|
||||
|
||||
var tables = scraper.tablesOf(dom);
|
||||
var header = scraper.extract(tables[2])[0];
|
||||
@ -159,11 +158,10 @@ function processP160(dom) {
|
||||
}
|
||||
|
||||
function start() {
|
||||
console.log('[ZZZ] Starting...');
|
||||
return when.all([
|
||||
console.log('Preparing tables...');
|
||||
return when.join(
|
||||
db.execute(db.createTable(stationsTable)),
|
||||
db.execute(db.createTable(samplesTable))
|
||||
]);
|
||||
db.execute(db.createTable(samplesTable)));
|
||||
}
|
||||
|
||||
function convertShiftJIStoUTF8(buffer) {
|
||||
@ -177,19 +175,26 @@ function scrapeP160(page, date) {
|
||||
}
|
||||
|
||||
function persist(statements) {
|
||||
console.log('[ZZZ] Persisting...');
|
||||
console.log('Persisting...');
|
||||
return db.executeAll(statements);
|
||||
}
|
||||
|
||||
function doP160() {
|
||||
console.log('[ZZZ] Starting P160...');
|
||||
return when.all([
|
||||
pipeline([scrapeP160, processP160, persist], 1),
|
||||
pipeline([scrapeP160, processP160, persist], 2)]);
|
||||
function doP160Page(page, date) {
|
||||
return scrapeP160(page, date)
|
||||
.then(processP160)
|
||||
.then(persist);
|
||||
}
|
||||
|
||||
function doP160(date) {
|
||||
return when.join(doP160Page(1, date), doP160Page(2, date));
|
||||
}
|
||||
|
||||
function doP160Now() {
|
||||
return doP160(null);
|
||||
}
|
||||
|
||||
function doStationDetails() {
|
||||
console.log('[ZZZ] Preparing station details.');
|
||||
console.log('Preparing station details...');
|
||||
var statements = [];
|
||||
_.keys(stationNames).forEach(function(name) {
|
||||
statements.push(db.upsert(stationsTable, {id: stationNames[name], name: name}));
|
||||
@ -198,17 +203,31 @@ function doStationDetails() {
|
||||
}
|
||||
|
||||
function doP160Historical() {
|
||||
console.log('[ZZZ] Starting P160 Historical...');
|
||||
var date = new Date().getTime();
|
||||
var results = [];
|
||||
for (var i = 0; i <= 10; i++) {
|
||||
var t = new Date(date - (i * 3600000));
|
||||
results.push(
|
||||
when.all([
|
||||
pipeline([scrapeP160, processP160, persist], 1, t),
|
||||
pipeline([scrapeP160, processP160, persist], 2, t)]));
|
||||
console.log('Starting P160 Historical...');
|
||||
var now = new Date().getTime();
|
||||
var dates = [];
|
||||
var hours = 5;
|
||||
for (var i = 1; i <= hours; i++) {
|
||||
dates.push(new Date(now - (i * 60 * 60 * 1000)));
|
||||
}
|
||||
return results;
|
||||
|
||||
function wait() {
|
||||
var d = when.defer();
|
||||
setTimeout(function() { d.resolve(); }, 5000);
|
||||
return d.promise;
|
||||
}
|
||||
|
||||
return function doAnotherDate() {
|
||||
if (dates.length > 0) {
|
||||
var date = dates.shift();
|
||||
console.log(tool.format('Processing {0}... (remaining: {1})', date, dates.length));
|
||||
return doP160(date).then(wait).then(doAnotherDate);
|
||||
}
|
||||
}();
|
||||
}
|
||||
|
||||
when(start()).then(pipeline([doP160, doStationDetails/*, doP160Historical*/]), console.error);
|
||||
start()
|
||||
.then(doP160Now)
|
||||
.then(doStationDetails)
|
||||
.then(doP160Historical)
|
||||
.then(null, console.error);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user