Skip to content

Commit f8ae249

Browse files
authored
improvement(executor): faster, more responsive workflow cancellation (#4630)
* improvement(executor): faster, more responsive workflow cancellation * fix(executor): publish cancellation event on Redis write failure * fix(tests): migrate handler test assertions to executeTool options object * chore(tests): rename stale 'cancellation check interval' test after polling removal
1 parent fffb879 commit f8ae249

32 files changed

Lines changed: 470 additions & 206 deletions

File tree

apps/sim/executor/execution/engine.test.ts

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,27 @@
44
import { sleep } from '@sim/utils/helpers'
55
import { afterEach, beforeEach, describe, expect, it, type Mock, vi } from 'vitest'
66

7+
const { mockCancellationSubscribers } = vi.hoisted(() => ({
8+
mockCancellationSubscribers: new Set<(event: { executionId: string }) => void>(),
9+
}))
10+
711
vi.mock('@/lib/execution/cancellation', () => ({
812
isExecutionCancelled: vi.fn(),
913
isRedisCancellationEnabled: vi.fn(),
14+
getCancellationChannel: () => ({
15+
publish: (event: { executionId: string }) => {
16+
for (const handler of mockCancellationSubscribers) handler(event)
17+
},
18+
subscribe: (handler: (event: { executionId: string }) => void) => {
19+
mockCancellationSubscribers.add(handler)
20+
return () => {
21+
mockCancellationSubscribers.delete(handler)
22+
}
23+
},
24+
dispose: () => {
25+
mockCancellationSubscribers.clear()
26+
},
27+
}),
1028
}))
1129

1230
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
@@ -115,6 +133,7 @@ function createMockNodeOrchestrator(executeDelay = 0): MockNodeOrchestrator {
115133
describe('ExecutionEngine', () => {
116134
beforeEach(() => {
117135
vi.clearAllMocks()
136+
mockCancellationSubscribers.clear()
118137
;(isExecutionCancelled as Mock).mockResolvedValue(false)
119138
;(isRedisCancellationEnabled as Mock).mockReturnValue(false)
120139
})
@@ -346,7 +365,93 @@ describe('ExecutionEngine', () => {
346365
expect(result.status).toBe('cancelled')
347366
})
348367

349-
it('should respect cancellation check interval', async () => {
368+
it('wakes from a slow in-flight node when a pub/sub cancellation arrives', async () => {
369+
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
370+
;(isExecutionCancelled as Mock).mockResolvedValue(false)
371+
372+
const startNode = createMockNode('start', 'starter')
373+
const slowNode = createMockNode('slow', 'function')
374+
startNode.outgoingEdges.set('edge1', { target: 'slow' })
375+
376+
const dag = createMockDAG([startNode, slowNode])
377+
const context = createMockContext({ executionId: 'pubsub-execution' })
378+
const edgeManager = createMockEdgeManager((node) => (node.id === 'start' ? ['slow'] : []))
379+
const nodeOrchestrator = createMockNodeOrchestrator(500)
380+
381+
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
382+
const executionPromise = engine.run('start')
383+
384+
setTimeout(() => {
385+
for (const handler of mockCancellationSubscribers) {
386+
handler({ executionId: 'pubsub-execution' })
387+
}
388+
}, 5)
389+
390+
const startTime = Date.now()
391+
const result = await executionPromise
392+
const duration = Date.now() - startTime
393+
394+
expect(result.status).toBe('cancelled')
395+
expect(duration).toBeLessThan(100)
396+
})
397+
398+
it('ignores pub/sub events targeting other executions', async () => {
399+
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
400+
;(isExecutionCancelled as Mock).mockResolvedValue(false)
401+
402+
const startNode = createMockNode('start', 'starter')
403+
const dag = createMockDAG([startNode])
404+
const context = createMockContext({ executionId: 'execution-a' })
405+
const edgeManager = createMockEdgeManager()
406+
const nodeOrchestrator = createMockNodeOrchestrator()
407+
408+
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
409+
410+
for (const handler of mockCancellationSubscribers) {
411+
handler({ executionId: 'execution-b' })
412+
}
413+
414+
const result = await engine.run('start')
415+
expect(result.status).toBeUndefined()
416+
expect(result.success).toBe(true)
417+
})
418+
419+
it('unsubscribes from the cancellation channel after run completes', async () => {
420+
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
421+
;(isExecutionCancelled as Mock).mockResolvedValue(false)
422+
423+
const startNode = createMockNode('start', 'starter')
424+
const dag = createMockDAG([startNode])
425+
const context = createMockContext({ executionId: 'cleanup-execution' })
426+
const edgeManager = createMockEdgeManager()
427+
const nodeOrchestrator = createMockNodeOrchestrator()
428+
429+
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
430+
expect(mockCancellationSubscribers.size).toBe(1)
431+
432+
await engine.run('start')
433+
434+
expect(mockCancellationSubscribers.size).toBe(0)
435+
})
436+
437+
it('honours the durable backstop when cancelled before subscribing', async () => {
438+
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
439+
;(isExecutionCancelled as Mock).mockResolvedValue(true)
440+
441+
const startNode = createMockNode('start', 'starter')
442+
const dag = createMockDAG([startNode])
443+
const context = createMockContext()
444+
const edgeManager = createMockEdgeManager()
445+
const nodeOrchestrator = createMockNodeOrchestrator()
446+
447+
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
448+
const result = await engine.run('start')
449+
450+
expect(result.status).toBe('cancelled')
451+
expect(nodeOrchestrator.executionCount).toBe(0)
452+
})
453+
454+
it('calls isExecutionCancelled once as the startup backstop check', async () => {
350455
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
351456
;(isExecutionCancelled as Mock).mockResolvedValue(false)
352457

@@ -359,7 +464,7 @@ describe('ExecutionEngine', () => {
359464
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
360465
await engine.run('start')
361466

362-
expect((isExecutionCancelled as Mock).mock.calls.length).toBeGreaterThanOrEqual(1)
467+
expect((isExecutionCancelled as Mock).mock.calls.length).toBe(1)
363468
})
364469
})
365470

apps/sim/executor/execution/engine.ts

Lines changed: 58 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { createLogger, type Logger } from '@sim/logger'
22
import { toError } from '@sim/utils/errors'
3-
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
3+
import {
4+
getCancellationChannel,
5+
isExecutionCancelled,
6+
isRedisCancellationEnabled,
7+
} from '@/lib/execution/cancellation'
48
import { BlockType } from '@/executor/constants'
59
import type { DAG } from '@/executor/dag/builder'
610
import type { EdgeManager } from '@/executor/execution/edge-manager'
@@ -31,11 +35,9 @@ export class ExecutionEngine {
3135
private errorFlag = false
3236
private stoppedEarlyFlag = false
3337
private executionError: Error | null = null
34-
private lastCancellationCheck = 0
35-
private readonly useRedisCancellation: boolean
36-
private readonly CANCELLATION_CHECK_INTERVAL_MS = 500
37-
private abortPromise: Promise<void> | null = null
38-
private abortResolve: (() => void) | null = null
38+
private abortPromise!: Promise<void>
39+
private abortResolve!: () => void
40+
private cancellationUnsubscribe: (() => void) | null = null
3941
private execLogger: Logger
4042

4143
constructor(
@@ -45,7 +47,6 @@ export class ExecutionEngine {
4547
private nodeOrchestrator: NodeExecutionOrchestrator
4648
) {
4749
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
48-
this.useRedisCancellation = isRedisCancellationEnabled() && !!this.context.executionId
4950
this.execLogger = logger.withMetadata({
5051
workflowId: this.context.workflowId,
5152
workspaceId: this.context.workspaceId,
@@ -54,72 +55,64 @@ export class ExecutionEngine {
5455
requestId: this.context.metadata.requestId,
5556
})
5657
this.initializeAbortHandler()
58+
this.subscribeToCancellationChannel()
59+
}
60+
61+
private subscribeToCancellationChannel(): void {
62+
if (!this.context.executionId) return
63+
const executionId = this.context.executionId
64+
this.cancellationUnsubscribe = getCancellationChannel().subscribe((event) => {
65+
if (event.executionId !== executionId) return
66+
this.execLogger.info('Execution cancelled via pub/sub', { executionId })
67+
this.signalCancelled()
68+
})
5769
}
5870

59-
/**
60-
* Sets up a single abort promise that can be reused throughout execution.
61-
* This avoids creating multiple event listeners and potential memory leaks.
62-
*/
6371
private initializeAbortHandler(): void {
72+
this.abortPromise = new Promise<void>((resolve) => {
73+
this.abortResolve = resolve
74+
})
75+
6476
if (!this.context.abortSignal) return
6577

6678
if (this.context.abortSignal.aborted) {
67-
this.cancelledFlag = true
68-
this.abortPromise = Promise.resolve()
79+
this.signalCancelled()
6980
return
7081
}
7182

72-
this.abortPromise = new Promise<void>((resolve) => {
73-
this.abortResolve = resolve
74-
})
75-
76-
this.context.abortSignal.addEventListener(
77-
'abort',
78-
() => {
79-
this.cancelledFlag = true
80-
this.abortResolve?.()
81-
},
82-
{ once: true }
83-
)
83+
this.context.abortSignal.addEventListener('abort', () => this.signalCancelled(), { once: true })
8484
}
8585

86-
private async checkCancellation(): Promise<boolean> {
87-
if (this.cancelledFlag) {
88-
return true
89-
}
90-
91-
if (this.useRedisCancellation) {
92-
const now = Date.now()
93-
if (now - this.lastCancellationCheck < this.CANCELLATION_CHECK_INTERVAL_MS) {
94-
return false
95-
}
96-
this.lastCancellationCheck = now
86+
private signalCancelled(): void {
87+
if (this.cancelledFlag) return
88+
this.cancelledFlag = true
89+
this.abortResolve()
90+
}
9791

98-
const cancelled = await isExecutionCancelled(this.context.executionId!)
99-
if (cancelled) {
100-
this.cancelledFlag = true
101-
this.execLogger.info('Execution cancelled via Redis', {
102-
executionId: this.context.executionId,
103-
})
104-
}
105-
return cancelled
106-
}
92+
private checkCancellation(): boolean {
93+
return this.cancelledFlag
94+
}
10795

108-
if (this.context.abortSignal?.aborted) {
109-
this.cancelledFlag = true
110-
return true
96+
/** Catches cancellations published before this engine subscribed (e.g. resume from snapshot). */
97+
private async checkCancellationBackstop(): Promise<void> {
98+
if (!this.context.executionId || !isRedisCancellationEnabled()) return
99+
const cancelled = await isExecutionCancelled(this.context.executionId)
100+
if (cancelled) {
101+
this.execLogger.info('Execution already cancelled at engine start (Redis backstop)', {
102+
executionId: this.context.executionId,
103+
})
104+
this.signalCancelled()
111105
}
112-
113-
return false
114106
}
115107

116108
async run(triggerBlockId?: string): Promise<ExecutionResult> {
117109
const startTime = performance.now()
118110
try {
119111
this.initializeQueue(triggerBlockId)
112+
await this.checkCancellationBackstop()
120113

121114
while (this.hasWork()) {
122-
if ((await this.checkCancellation()) || this.errorFlag || this.stoppedEarlyFlag) {
115+
if (this.checkCancellation() || this.errorFlag || this.stoppedEarlyFlag) {
123116
break
124117
}
125118
await this.processQueue()
@@ -194,6 +187,15 @@ export class ExecutionEngine {
194187
attachExecutionResult(error, executionResult)
195188
}
196189
throw error
190+
} finally {
191+
this.cleanup()
192+
}
193+
}
194+
195+
private cleanup(): void {
196+
if (this.cancellationUnsubscribe) {
197+
this.cancellationUnsubscribe()
198+
this.cancellationUnsubscribe = null
197199
}
198200
}
199201

@@ -238,32 +240,17 @@ export class ExecutionEngine {
238240

239241
private async waitForAnyExecution(): Promise<void> {
240242
if (this.executing.size > 0) {
241-
const abortPromise = this.getAbortPromise()
242-
if (abortPromise) {
243-
await Promise.race([...this.executing, abortPromise])
244-
} else {
245-
await Promise.race(this.executing)
246-
}
243+
await Promise.race([...this.executing, this.abortPromise])
247244
}
248245
}
249246

250247
private async waitForAllExecutions(): Promise<void> {
251-
const abortPromise = this.getAbortPromise()
252-
if (abortPromise) {
253-
await Promise.race([Promise.all(this.executing), abortPromise])
254-
} else {
255-
await Promise.all(this.executing)
248+
await Promise.race([Promise.all(this.executing), this.abortPromise])
249+
if (this.executing.size > 0) {
250+
await Promise.allSettled(this.executing)
256251
}
257252
}
258253

259-
/**
260-
* Returns the cached abort promise. This is safe to call multiple times
261-
* as it reuses the same promise instance created during initialization.
262-
*/
263-
private getAbortPromise(): Promise<void> | null {
264-
return this.abortPromise
265-
}
266-
267254
private async withQueueLock<T>(fn: () => Promise<T> | T): Promise<T> {
268255
const prevLock = this.queueLock
269256
let resolveLock: () => void
@@ -363,7 +350,7 @@ export class ExecutionEngine {
363350

364351
private async processQueue(): Promise<void> {
365352
while (this.readyQueue.length > 0) {
366-
if ((await this.checkCancellation()) || this.errorFlag) {
353+
if (this.checkCancellation() || this.errorFlag) {
367354
break
368355
}
369356
const nodeId = this.dequeue()

apps/sim/executor/handlers/api/api-handler.test.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,7 @@ describe('ApiBlockHandler', () => {
116116
body: { key: 'value' }, // Expect parsed body
117117
_context: { workflowId: 'test-workflow-id' },
118118
},
119-
false, // skipPostProcess
120-
mockContext // execution context
119+
{ executionContext: mockContext }
121120
)
122121
expect(result).toEqual(expectedOutput)
123122
})
@@ -177,8 +176,7 @@ describe('ApiBlockHandler', () => {
177176
expect(mockExecuteTool).toHaveBeenCalledWith(
178177
'http_request',
179178
expect.objectContaining({ body: expectedParsedBody }),
180-
false, // skipPostProcess
181-
mockContext // execution context
179+
{ executionContext: mockContext }
182180
)
183181
})
184182

@@ -193,8 +191,7 @@ describe('ApiBlockHandler', () => {
193191
expect(mockExecuteTool).toHaveBeenCalledWith(
194192
'http_request',
195193
expect.objectContaining({ body: 'This is plain text' }),
196-
false, // skipPostProcess
197-
mockContext // execution context
194+
{ executionContext: mockContext }
198195
)
199196
})
200197

@@ -209,8 +206,7 @@ describe('ApiBlockHandler', () => {
209206
expect(mockExecuteTool).toHaveBeenCalledWith(
210207
'http_request',
211208
expect.objectContaining({ body: undefined }),
212-
false, // skipPostProcess
213-
mockContext // execution context
209+
{ executionContext: mockContext }
214210
)
215211
})
216212

apps/sim/executor/handlers/api/api-handler.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ export class ApiBlockHandler implements BlockHandler {
7878
callChain: ctx.callChain,
7979
},
8080
},
81-
false,
82-
ctx
81+
{ executionContext: ctx }
8382
)
8483

8584
if (!result.success) {

0 commit comments

Comments
 (0)