Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ const TYPEWRITER_MS_PER_CHAR = 15
* value statically — animation fires only for subsequent updates, which in
* practice means SSE-driven workflow completions arriving via
* `useTableEventStream → applyCell()`.
*
* rAF-driven (not `setInterval`) so concurrent reveals batch into one
* render/paint per frame instead of O(cells) uncoordinated reflows; reveal
* length is elapsed-time based so dropped frames catch up rather than slow.
*/
function useTypewriter(text: string | null): string | null {
const [revealed, setRevealed] = useState<string | null>(text)
Expand All @@ -317,14 +321,17 @@ function useTypewriter(text: string | null): string | null {
return
}

const full = text
const start = performance.now()
let raf = 0
const tick = (now: number) => {
const chars = Math.min(full.length, Math.floor((now - start) / TYPEWRITER_MS_PER_CHAR))
setRevealed(full.slice(0, chars))
if (chars < full.length) raf = requestAnimationFrame(tick)
}
setRevealed('')
let i = 0
const id = window.setInterval(() => {
i++
setRevealed(text.slice(0, i))
if (i >= text.length) window.clearInterval(id)
}, TYPEWRITER_MS_PER_CHAR)
return () => window.clearInterval(id)
raf = requestAnimationFrame(tick)
return () => cancelAnimationFrame(raf)
}, [text])

return revealed
Expand Down
70 changes: 62 additions & 8 deletions apps/sim/hooks/queries/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1336,10 +1336,12 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
queryClient.getQueryData<TableDefinition>(tableKeys.detail(tableId))?.schema
.workflowGroups ?? []
const groupsById = new Map(groups.map((g) => [g.id, g]))
// Tally cells stamped per row to bump the run-state counter in lockstep.
const stampedByRow: Record<string, number> = {}
const snapshots = await snapshotAndMutateRows(queryClient, tableId, (r) => {
if (targetRowIds && !targetRowIds.has(r.id)) return null
const executions = r.executions ?? {}
let changed = false
let stamped = 0
const next: RowExecutions = { ...executions }
const nextData = { ...r.data }
for (const groupId of targetGroupIds) {
Expand Down Expand Up @@ -1367,20 +1369,72 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
if (o.columnName in nextData) nextData[o.columnName] = null
}
}
changed = true
stamped++
}
if (!changed) return null
if (stamped === 0) return null
stampedByRow[r.id] = stamped
return { ...r, data: nextData, executions: next }
})
return { snapshots }

// Bump the counter to match the stamped cells. Without it the "X running"
// badge + gutter Stop stay at zero until a refetch: the optimistic stamp
// already marks the cell in-flight, so the dispatcher's `pending` SSE
// sees no `wasInFlight` transition and never bumps the counter.
const runStateSnapshot = queryClient.getQueryData<TableRunState>(
tableKeys.activeDispatches(tableId)
)
const totalStamped = Object.values(stampedByRow).reduce((s, n) => s + n, 0)
if (totalStamped > 0) {
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
const base = prev ?? { dispatches: [], runningCellCount: 0, runningByRowId: {} }
const nextByRow = { ...base.runningByRowId }
for (const [rid, n] of Object.entries(stampedByRow)) {
nextByRow[rid] = (nextByRow[rid] ?? 0) + n
}
return {
...base,
runningCellCount: base.runningCellCount + totalStamped,
runningByRowId: nextByRow,
}
})
}
return { snapshots, runStateSnapshot, didBumpRunState: totalStamped > 0 }
},
onError: (_err, _variables, context) => {
if (context?.snapshots) restoreCachedWorkflowCells(queryClient, context.snapshots)
// Roll back the optimistic counter bump (snapshot may be undefined).
if (context?.didBumpRunState) {
queryClient.setQueryData(tableKeys.activeDispatches(tableId), context.runStateSnapshot)
}
},
onSuccess: () => {
// Seed the active-dispatch overlay immediately (insertDispatch ran
// server-side before responding); rows cache stays owned by SSE.
void queryClient.invalidateQueries({ queryKey: tableKeys.activeDispatches(tableId) })
onSuccess: (data, { groupIds, runMode = 'all', rowIds }, context) => {
// Seed the dispatch into the overlay (drives resolveCellExec for
// ahead-of-cursor rows) from the response — refetching would reset the
// optimistic counter to the server's still-zero count.
const dispatchId = data?.data?.dispatchId
if (!dispatchId) {
// No dispatch created → no SSE to reconcile the bump; roll it back.
if (context?.didBumpRunState) {
queryClient.setQueryData(tableKeys.activeDispatches(tableId), context.runStateSnapshot)
}
return
}
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
const base = prev ?? { dispatches: [], runningCellCount: 0, runningByRowId: {} }
if (base.dispatches.some((d) => d.id === dispatchId)) return base
const dispatch: ActiveDispatch = {
id: dispatchId,
status: 'pending',
mode: runMode,
isManualRun: true,
cursor: -1,
scope: {
groupIds,
...(rowIds && rowIds.length > 0 ? { rowIds } : {}),
},
}
return { ...base, dispatches: [...base.dispatches, dispatch] }
})
Comment thread
TheodoreSpeaks marked this conversation as resolved.
},
})
}
Expand Down
8 changes: 6 additions & 2 deletions apps/sim/lib/table/trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import { createLogger } from '@sim/logger'
import { generateShortId } from '@sim/utils/id'
import type { RowData, TableRow, TableSchema } from '@/lib/table/types'
import { fetchActiveWebhooks } from '@/lib/webhooks/polling/utils'
import { processPolledWebhookEvent } from '@/lib/webhooks/processor'

const logger = createLogger('TableTrigger')

Expand Down Expand Up @@ -57,6 +55,10 @@ export async function fireTableTrigger(
requestId: string
): Promise<void> {
try {
// Lazy: the webhook utils/processor pull in the executor + blocks stack.
// Eager imports would force every `lib/table/service` consumer (e.g. the
// dispatcher) to pay that cold-start even when no trigger fires.
const { fetchActiveWebhooks } = await import('@/lib/webhooks/polling/utils')
const webhooks = await fetchActiveWebhooks('table')
if (webhooks.length === 0) return

Expand All @@ -74,6 +76,8 @@ export async function fireTableTrigger(

if (matching.length === 0) return

const { processPolledWebhookEvent } = await import('@/lib/webhooks/processor')

logger.info(
`[${requestId}] Firing ${matching.length} trigger(s) for ${rows.length} ${eventType} event(s) in table ${tableId}`
)
Expand Down
13 changes: 10 additions & 3 deletions apps/sim/lib/table/workflow-columns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,18 @@ export function buildPendingRuns(
/** Build the per-cell `{payload, options}` items for `queue.batchEnqueue` /
* `queue.batchEnqueueAndWait`. Hydrates trigger.dev tags, concurrency keys,
* the inline runner, and the cancel key the inline backend uses to map a
* Stop click to the in-flight cell's AbortController. */
* Stop click to the in-flight cell's AbortController.
*
* `runner` is only used by the database backend; trigger.dev triggers by task
* id. The cell-job import pulls in the executor + blocks stack, so skip it on
* trigger.dev to avoid a multi-second dispatcher cold-start. */
export async function buildEnqueueItems(
pendingRuns: WorkflowGroupCellPayload[]
): Promise<Array<{ payload: WorkflowGroupCellPayload; options: EnqueueOptions }>> {
const { executeWorkflowGroupCellJob } = await import('@/background/workflow-column-execution')
const runner = isTriggerDevEnabled
? undefined
: ((await import('@/background/workflow-column-execution'))
.executeWorkflowGroupCellJob as EnqueueOptions['runner'])
return pendingRuns.map((runOpts) => ({
payload: runOpts,
options: {
Expand All @@ -225,7 +232,7 @@ export async function buildEnqueueItems(
concurrencyKey: runOpts.tableId,
concurrencyLimit: TABLE_CONCURRENCY_LIMIT,
tags: cellTagsFor(runOpts),
runner: executeWorkflowGroupCellJob as EnqueueOptions['runner'],
...(runner ? { runner } : {}),
cancelKey: cellCancelKey(runOpts.tableId, runOpts.rowId, runOpts.groupId),
},
}))
Expand Down
Loading