fix(pool): terminate workers on CTRL+c forceful exits (#9140)

This commit is contained in:
Ari Perkkiö 2025-12-02 13:56:30 +02:00 committed by GitHub
parent fa34701d25
commit d57d8bf0a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 100 additions and 9 deletions

View File

@ -23,7 +23,7 @@ interface QueuedTask {
} }
interface ActiveTask extends QueuedTask { interface ActiveTask extends QueuedTask {
cancelTask: () => Promise<void> cancelTask: (options?: { force: boolean }) => Promise<void>
} }
export class Pool { export class Pool {
@ -80,7 +80,11 @@ export class Pool {
this.activeTasks.push(activeTask) this.activeTasks.push(activeTask)
// active tasks receive cancel signal and shut down gracefully // active tasks receive cancel signal and shut down gracefully
async function cancelTask() { async function cancelTask(options?: { force: boolean }) {
if (options?.force) {
await runner.stop({ force: true })
}
await runner.waitForTerminated() await runner.waitForTerminated()
resolver.reject(new Error('Cancelled')) resolver.reject(new Error('Cancelled'))
} }
@ -171,6 +175,10 @@ export class Pool {
} }
async cancel(): Promise<void> { async cancel(): Promise<void> {
// Force exit if previous cancel is still on-going
// for example when user does 'CTRL+c' twice in row
const force = this._isCancelling
// Set flag to prevent new tasks from being queued // Set flag to prevent new tasks from being queued
this._isCancelling = true this._isCancelling = true
@ -181,13 +189,14 @@ export class Pool {
pendingTasks.forEach(task => task.resolver.reject(error)) pendingTasks.forEach(task => task.resolver.reject(error))
} }
const activeTasks = this.activeTasks.splice(0) await Promise.all(this.activeTasks.map(task => task.cancelTask({ force })))
await Promise.all(activeTasks.map(task => task.cancelTask())) this.activeTasks = []
const sharedRunners = this.sharedRunners.splice(0) await Promise.all(this.sharedRunners.map(runner => runner.stop()))
await Promise.all(sharedRunners.map(runner => runner.stop())) this.sharedRunners = []
await Promise.all(this.exitPromises.splice(0)) await Promise.all(this.exitPromises)
this.exitPromises = []
this.workerIds.forEach((_, id) => this.freeWorkerId(id)) this.workerIds.forEach((_, id) => this.freeWorkerId(id))

View File

@ -19,6 +19,21 @@ enum RunnerState {
STOPPED = 'stopped', STOPPED = 'stopped',
} }
interface StopOptions {
/**
* **Do not use unless you have good reason to.**
*
* Indicates whether to skip waiting for worker's response for `{ type: 'stop' }` message or not.
* By default `.stop()` terminates the workers gracefully by sending them stop-message
* and waiting for workers response, so that workers can do proper teardown.
*
* Force exit is used when user presses `CTRL+c` twice in row and intentionally does
* non-graceful exit. For example in cases where worker is stuck on synchronous thread
* blocking function call and it won't response to `{ type: 'stop' }` messages.
*/
force: boolean
}
const START_TIMEOUT = 60_000 const START_TIMEOUT = 60_000
const STOP_TIMEOUT = 60_000 const STOP_TIMEOUT = 60_000
@ -218,7 +233,7 @@ export class PoolRunner {
} }
} }
async stop(): Promise<void> { async stop(options?: StopOptions): Promise<void> {
// Wait for any ongoing operation to complete // Wait for any ongoing operation to complete
if (this._operationLock) { if (this._operationLock) {
await this._operationLock await this._operationLock
@ -263,6 +278,11 @@ export class PoolRunner {
} }
} }
// Don't wait for graceful exit's response when force exiting
if (options?.force) {
return onStop({ type: 'stopped', __vitest_worker_response__: true })
}
this.on('message', onStop) this.on('message', onStop)
this.postMessage({ this.postMessage({
type: 'stop', type: 'stop',

View File

@ -26,6 +26,8 @@ if (isProfiling) {
processOn('SIGTERM', () => processExit()) processOn('SIGTERM', () => processExit())
} }
processOn('error', onError)
export default function workerInit(options: { export default function workerInit(options: {
runTests: (method: 'run' | 'collect', state: WorkerGlobalState, traces: Traces) => Promise<void> runTests: (method: 'run' | 'collect', state: WorkerGlobalState, traces: Traces) => Promise<void>
setup?: (context: WorkerSetupContext) => Promise<() => Promise<unknown>> setup?: (context: WorkerSetupContext) => Promise<() => Promise<unknown>>
@ -36,7 +38,10 @@ export default function workerInit(options: {
post: v => processSend(v), post: v => processSend(v),
on: cb => processOn('message', cb), on: cb => processOn('message', cb),
off: cb => processOff('message', cb), off: cb => processOff('message', cb),
teardown: () => processRemoveAllListeners('message'), teardown: () => {
processRemoveAllListeners('message')
processOff('error', onError)
},
runTests: (state, traces) => executeTests('run', state, traces), runTests: (state, traces) => executeTests('run', state, traces),
collectTests: (state, traces) => executeTests('collect', state, traces), collectTests: (state, traces) => executeTests('collect', state, traces),
setup: options.setup, setup: options.setup,
@ -51,3 +56,11 @@ export default function workerInit(options: {
} }
} }
} }
// Prevent leaving worker in loops where it tries to send message to closed main
// thread, errors, and tries to send the error.
function onError(error: any) {
if (error?.code === 'ERR_IPC_CHANNEL_CLOSED' || error?.code === 'EPIPE') {
processExit(1)
}
}

View File

@ -0,0 +1,6 @@
import { test } from 'vitest'
test('slow timeouting test', { timeout: 30_000 }, async () => {
console.log("Running slow timeouting test")
await new Promise(resolve => setTimeout(resolve, 40_000))
})

View File

@ -0,0 +1,43 @@
import { Readable, Writable } from 'node:stream'
import { stripVTControlCharacters } from 'node:util'
import { createDefer } from '@vitest/utils/helpers'
import { expect, test } from 'vitest'
import { createVitest, registerConsoleShortcuts } from 'vitest/node'
test('can force cancel a run', async () => {
const onExit = vi.fn<never>()
const exit = process.exit
onTestFinished(() => {
process.exit = exit
})
process.exit = onExit
const onTestCaseReady = createDefer<void>()
const vitest = await createVitest('test', {
root: 'fixtures/cancel-run',
reporters: [{ onTestCaseReady: () => onTestCaseReady.resolve() }],
})
onTestFinished(() => vitest.close())
const stdin = new Readable({ read: () => '' }) as NodeJS.ReadStream
stdin.isTTY = true
stdin.setRawMode = () => stdin
registerConsoleShortcuts(vitest, stdin, new Writable())
const onLog = vi.spyOn(vitest.logger, 'log').mockImplementation(() => {})
const promise = vitest.start()
await onTestCaseReady
// First CTRL+c should log warning about graceful exit
stdin.emit('data', '\x03')
const logs = onLog.mock.calls.map(log => stripVTControlCharacters(log[0] || '').trim())
expect(logs).toContain('Cancelling test run. Press CTRL+c again to exit forcefully.')
// Second CTRL+c should stop run
stdin.emit('data', '\x03')
await promise
expect(onExit).toHaveBeenCalled()
})