328 lines
8.3 KiB
JavaScript

const _ = require('lodash');
const { DepGraph } = require('dependency-graph');
const Emittery = require('emittery');
const ow = require('ow');
const Queue = require('p-queue').default;
const v = require('./validators');
// Work around an annoying typo in a method name.
DepGraph.prototype.dependentsOf = DepGraph.prototype.dependantsOf;
module.exports = class TaskRunner extends Emittery {
constructor(context) {
super();
ow(context, ow.optional.object);
this._init(context);
}
_addOrRemoveTasks(tasks, func, action) {
func = _.bind(func, this);
if (Array.isArray(tasks)) {
tasks.forEach((task, i) => {
try {
func(task);
} catch (e) {
e.message = `Can't ${action} task ${i}: ${e.message}`;
throw e;
}
});
} else if (tasks !== null && typeof tasks === 'object') {
for (const task of Object.keys(tasks)) {
try {
func(tasks[task]);
} catch (e) {
e.message = `Can't ${action} task "${task}": ${e.message}`;
throw e;
}
}
}
}
_addTaskEmitters(task) {
const u = {};
u.start = task.on('start', t => this.emit('taskStart', t));
u.end = task.on('end', t => this.emit('taskEnd', t));
u.error = task.on('error', (e => {
this.emit('taskError', {
task: e.task,
error: e.error
});
if (!this._error) {
this._error = e.error;
}
}));
this._unsubscribers.set(task.name, u);
}
_bindTaskFunc(task) {
return _.bind(task.run, task, this._context);
}
_createTaskSequence(tasks) {
if (!tasks.length) {
return null;
}
return () => tasks.reduce((p, taskName) => {
const task = this._nameToTask.get(taskName);
return p.then(
this._bindTaskFunc(task),
e => Promise.reject(e)
);
}, Promise.resolve());
}
_init(context) {
this._context = context;
this._deps = new Map();
this._error = null;
this._queue = new Queue();
this._taskToName = new WeakMap();
this._nameToTask = new Map();
this._running = false;
this._unsubscribers = new Map();
this._queue.pause();
}
_newDependencyCycleError(cyclePath) {
return new v.DependencyCycleError(
`Tasks have circular dependencies: ${cyclePath.join(' > ')}`,
cyclePath
);
}
_newStateError() {
return new v.StateError('The task runner is already running.');
}
_newUnknownDepsError(dependent, unknownDeps) {
let errorText;
if (unknownDeps.length === 1) {
errorText = 'an unknown task';
} else {
errorText = 'unknown tasks';
}
return new v.UnknownDependencyError(`The task ${dependent} depends on ${errorText}: ` +
`${unknownDeps.join(', ')}`);
}
_orderTasks() {
let error;
const graph = new DepGraph();
let parallel;
let sequential;
for (const [task] of this._nameToTask) {
graph.addNode(task);
}
for (const [dependent] of this._deps) {
const unknownDeps = [];
for (const dependency of this._deps.get(dependent)) {
if (!this._nameToTask.has(dependency)) {
unknownDeps.push(dependency);
} else {
graph.addDependency(dependent, dependency);
}
if (unknownDeps.length) {
error = this._newUnknownDepsError(dependency, unknownDeps);
break;
}
}
}
if (!error) {
try {
// Get standalone tasks with no dependencies and no dependents.
parallel = graph.overallOrder(true)
.filter(task => !(graph.dependentsOf(task).length));
// Get tasks with dependencies, in a correctly ordered list.
sequential = graph.overallOrder().filter(task => !parallel.includes(task));
} catch (e) {
error = this._newDependencyCycleError(e.cyclePath);
}
}
return {
error,
parallel,
sequential
};
}
_rejectIfRunning() {
if (this.running) {
return Promise.reject(this._newStateError());
}
return null;
}
_throwIfRunning() {
if (this.running) {
throw this._newStateError();
}
}
_throwIfUnknownDeps(dependent, unknownDeps) {
if (!unknownDeps.length) {
return;
}
throw this._newUnknownDepsError(dependent, unknownDeps);
}
addTask(task) {
ow(task, v.checkTaskOrString);
this._throwIfRunning();
this._nameToTask.set(task.name, task);
if (task.dependsOn) {
this._deps.set(task.name, task.dependsOn);
}
this._taskToName.set(task, task.name);
this._addTaskEmitters(task);
return this;
}
addTasks(tasks) {
ow(tasks, ow.any(ow.array, ow.object));
this._addOrRemoveTasks(tasks, this.addTask, 'add');
return this;
}
end() {
this.emit('end', {
error: this._error
});
this._queue.clear();
this._init();
}
removeTask(task) {
let unsubscribers;
ow(task, v.checkTaskOrString);
this._throwIfRunning();
if (typeof task === 'string') {
task = this._nameToTask.get(task);
if (!task) {
throw new v.UnknownTaskError(`Unknown task: ${task}`);
}
} else if (typeof task === 'object') {
if (!this._taskToName.has(task)) {
throw new v.UnknownTaskError(`Unknown task: ${task}`);
}
}
this._nameToTask.delete(task.name);
this._taskToName.delete(task);
this._deps.delete(task.name);
unsubscribers = this._unsubscribers.get(task.name);
for (const u of Object.keys(unsubscribers)) {
unsubscribers[u]();
}
this._unsubscribers.delete(task.name);
return this;
}
removeTasks(tasks) {
ow(tasks, ow.any(ow.array, ow.object));
this._addOrRemoveTasks(tasks, this.removeTask, 'remove');
return this;
}
run(context) {
ow(context, ow.optional.object);
let endPromise;
const { error, parallel, sequential } = this._orderTasks();
let runningPromise;
let taskFuncs = [];
let taskSequence;
// First, fail if the runner is already running.
runningPromise = this._rejectIfRunning();
if (runningPromise) {
return runningPromise;
}
// Then fail if the tasks couldn't be ordered.
if (error) {
return Promise.reject(error);
}
this._context = context || this._context;
for (const taskName of parallel) {
taskFuncs.push(this._bindTaskFunc(this._nameToTask.get(taskName)));
}
taskSequence = this._createTaskSequence(sequential);
if (taskSequence) {
taskFuncs.push(taskSequence);
}
endPromise = this._queue.addAll(taskFuncs).then(() => {
this.end();
if (this._error) {
return Promise.reject(this._error);
} else {
return Promise.resolve();
}
}, e => {
this.end();
return Promise.reject(e);
});
this.emit('start');
this._running = true;
try {
this._queue.start();
return endPromise;
} catch (e) {
this._error = e;
this.end();
return Promise.reject(e);
}
}
get running() {
return this._running;
}
get tasks() {
const entries = [];
for (const entry of this._nameToTask.entries()) {
entries.push(entry);
}
return _.fromPairs(entries);
}
};