/* Copyright 2019 the JSDoc Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ import dependencyGraph from 'dependency-graph'; import Emittery from 'emittery'; import _ from 'lodash'; import ow from 'ow'; import Queue from 'p-queue'; import v from './validators.js'; const { DepGraph } = dependencyGraph; // Work around an annoying typo in a method name. DepGraph.prototype.dependentsOf = DepGraph.prototype.dependantsOf; export class TaskRunner extends Emittery { constructor(context) { super(); ow(context, ow.optional.object); this._init(context); } _addOrRemoveTasks(tasks, func, action) { func = _.bind(func, this); if (Array.isArray(tasks)) { tasks.forEach((task, i) => { try { func(task); } catch (e) { e.message = `Can't ${action} task ${i}: ${e.message}`; throw e; } }); } else if (tasks !== null && typeof tasks === 'object') { for (const task of Object.keys(tasks)) { try { func(tasks[task]); } catch (e) { e.message = `Can't ${action} task "${task}": ${e.message}`; throw e; } } } } _addTaskEmitters(task) { const u = {}; u.start = task.on('start', (t) => this.emit('taskStart', t)); u.end = task.on('end', (t) => this.emit('taskEnd', t)); u.error = task.on('error', (e) => { this.emit('taskError', { task: e.task, error: e.error, }); if (!this._error) { this._error = e.error; } }); this._unsubscribers.set(task.name, u); } _bindTaskFunc(task) { return _.bind(task.run, task, this._context); } _createTaskSequence(tasks) { if (!tasks.length) { return null; } return () => tasks.reduce((p, taskName) => { const task = this._nameToTask.get(taskName); return p.then(this._bindTaskFunc(task), (e) => Promise.reject(e)); }, Promise.resolve()); } _init(context) { this._context = context; this._deps = new Map(); this._error = null; this._queue = new Queue(); this._taskToName = new WeakMap(); this._nameToTask = new Map(); this._running = false; this._unsubscribers = new Map(); this._queue.pause(); } _newDependencyCycleError(cyclePath) { return new v.DependencyCycleError( `Tasks have circular dependencies: ${cyclePath.join(' > ')}`, cyclePath ); } _newStateError() { return new v.StateError('The task runner is already running.'); } _newUnknownDepsError(dependent, unknownDeps) { let errorText; if (unknownDeps.length === 1) { errorText = 'an unknown task'; } else { errorText = 'unknown tasks'; } return new v.UnknownDependencyError( `The task ${dependent} depends on ${errorText}: ${unknownDeps.join(', ')}` ); } _orderTasks() { let error; const graph = new DepGraph(); let parallel; let sequential; for (const [task] of this._nameToTask) { graph.addNode(task); } for (const [dependent] of this._deps) { const unknownDeps = []; for (const dependency of this._deps.get(dependent)) { if (!this._nameToTask.has(dependency)) { unknownDeps.push(dependency); } else { graph.addDependency(dependent, dependency); } if (unknownDeps.length) { error = this._newUnknownDepsError(dependency, unknownDeps); break; } } } if (!error) { try { // Get standalone tasks with no dependencies and no dependents. parallel = graph.overallOrder(true).filter((task) => !graph.dependentsOf(task).length); // Get tasks with dependencies, in a correctly ordered list. sequential = graph.overallOrder().filter((task) => !parallel.includes(task)); } catch (e) { error = this._newDependencyCycleError(e.cyclePath); } } return { error, parallel, sequential, }; } _rejectIfRunning() { if (this.running) { return Promise.reject(this._newStateError()); } return null; } _throwIfRunning() { if (this.running) { throw this._newStateError(); } } _throwIfUnknownDeps(dependent, unknownDeps) { if (!unknownDeps.length) { return; } throw this._newUnknownDepsError(dependent, unknownDeps); } addTask(task) { ow(task, v.checkTaskOrString); this._throwIfRunning(); this._nameToTask.set(task.name, task); if (task.dependsOn) { this._deps.set(task.name, task.dependsOn); } this._taskToName.set(task, task.name); this._addTaskEmitters(task); return this; } addTasks(tasks) { ow(tasks, ow.any(ow.array, ow.object)); this._addOrRemoveTasks(tasks, this.addTask, 'add'); return this; } end() { this.emit('end', { error: this._error, }); this._queue.clear(); this._init(); } removeTask(task) { let unsubscribers; ow(task, v.checkTaskOrString); this._throwIfRunning(); if (typeof task === 'string') { task = this._nameToTask.get(task); if (!task) { throw new v.UnknownTaskError(`Unknown task: ${task}`); } } else if (typeof task === 'object') { if (!this._taskToName.has(task)) { throw new v.UnknownTaskError(`Unknown task: ${task}`); } } this._nameToTask.delete(task.name); this._taskToName.delete(task); this._deps.delete(task.name); unsubscribers = this._unsubscribers.get(task.name); for (const u of Object.keys(unsubscribers)) { unsubscribers[u](); } this._unsubscribers.delete(task.name); return this; } removeTasks(tasks) { ow(tasks, ow.any(ow.array, ow.object)); this._addOrRemoveTasks(tasks, this.removeTask, 'remove'); return this; } run(context) { ow(context, ow.optional.object); let endPromise; const { error, parallel, sequential } = this._orderTasks(); let runningPromise; let taskFuncs = []; let taskSequence; // First, fail if the runner is already running. runningPromise = this._rejectIfRunning(); if (runningPromise) { return runningPromise; } // Then fail if the tasks couldn't be ordered. if (error) { return Promise.reject(error); } this._context = context || this._context; for (const taskName of parallel) { taskFuncs.push(this._bindTaskFunc(this._nameToTask.get(taskName))); } taskSequence = this._createTaskSequence(sequential); if (taskSequence) { taskFuncs.push(taskSequence); } endPromise = this._queue.addAll(taskFuncs).then( () => { this.end(); if (this._error) { return Promise.reject(this._error); } else { return Promise.resolve(); } }, (e) => { this.end(); return Promise.reject(e); } ); this.emit('start'); this._running = true; try { this._queue.start(); return endPromise; } catch (e) { this._error = e; this.end(); return Promise.reject(e); } } get running() { return this._running; } get tasks() { const entries = []; for (const entry of this._nameToTask.entries()) { entries.push(entry); } return _.fromPairs(entries); } }