From 60e355a39926a0d55ec1040c3714db84c5d4666f Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 24 Jun 2026 20:14:29 -0700 Subject: [PATCH 1/2] refactor(realtime): type the socket event-handler boundary with @sim/realtime-protocol Replace the (data: any) event-handler types in socket-provider.tsx with precise broadcast types that mirror the exact payloads emitted by the realtime Socket.IO server (apps/realtime/src/handlers/** and rooms/**). Add @sim/realtime-protocol/events with the canonical wire types for the broadcast/confirmation events the server emits: WorkflowOperationBroadcast, SubblockUpdateBroadcast, VariableUpdateBroadcast, CursorUpdateBroadcast, SelectionUpdateBroadcast, the four workflow-lifecycle broadcasts, and OperationConfirmed/Failed. Typing change only; zero runtime/logic changes. Store-internal any (rehydrate state, subblock map, emit payloads) is left untouched as out of scope. --- .../workspace/providers/socket-provider.tsx | 110 +++++++++------- packages/realtime-protocol/package.json | 4 + packages/realtime-protocol/src/events.ts | 121 ++++++++++++++++++ 3 files changed, 188 insertions(+), 47 deletions(-) create mode 100644 packages/realtime-protocol/src/events.ts diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 47587fe4f0e..3e5d6fa9040 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -11,6 +11,19 @@ import { useState, } from 'react' import { createLogger } from '@sim/logger' +import type { + CursorUpdateBroadcast, + OperationConfirmedBroadcast, + OperationFailedBroadcast, + SelectionUpdateBroadcast, + SubblockUpdateBroadcast, + VariableUpdateBroadcast, + WorkflowDeletedBroadcast, + WorkflowDeployedBroadcast, + WorkflowOperationBroadcast, + WorkflowRevertedBroadcast, + WorkflowUpdatedBroadcast, +} from '@sim/realtime-protocol/events' import { generateId } from '@sim/utils/id' import { backoffWithJitter } from '@sim/utils/retry' import { useParams } from 'next/navigation' @@ -92,18 +105,18 @@ interface SocketContextType { emitCursorUpdate: (cursor: { x: number; y: number } | null) => void emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void - onWorkflowOperation: (handler: (data: any) => void) => void - onSubblockUpdate: (handler: (data: any) => void) => void - onVariableUpdate: (handler: (data: any) => void) => void - - onCursorUpdate: (handler: (data: any) => void) => void - onSelectionUpdate: (handler: (data: any) => void) => void - onWorkflowDeleted: (handler: (data: any) => void) => void - onWorkflowReverted: (handler: (data: any) => void) => void - onWorkflowUpdated: (handler: (data: any) => void) => void - onWorkflowDeployed: (handler: (data: any) => void) => void - onOperationConfirmed: (handler: (data: any) => void) => void - onOperationFailed: (handler: (data: any) => void) => void + onWorkflowOperation: (handler: (data: WorkflowOperationBroadcast) => void) => void + onSubblockUpdate: (handler: (data: SubblockUpdateBroadcast) => void) => void + onVariableUpdate: (handler: (data: VariableUpdateBroadcast) => void) => void + + onCursorUpdate: (handler: (data: CursorUpdateBroadcast) => void) => void + onSelectionUpdate: (handler: (data: SelectionUpdateBroadcast) => void) => void + onWorkflowDeleted: (handler: (data: WorkflowDeletedBroadcast) => void) => void + onWorkflowReverted: (handler: (data: WorkflowRevertedBroadcast) => void) => void + onWorkflowUpdated: (handler: (data: WorkflowUpdatedBroadcast) => void) => void + onWorkflowDeployed: (handler: (data: WorkflowDeployedBroadcast) => void) => void + onOperationConfirmed: (handler: (data: OperationConfirmedBroadcast) => void) => void + onOperationFailed: (handler: (data: OperationFailedBroadcast) => void) => void } const SocketContext = createContext({ @@ -173,17 +186,17 @@ export function SocketProvider({ children, user }: SocketProviderProps) { explicitWorkflowIdRef.current = explicitWorkflowId const eventHandlers = useRef<{ - workflowOperation?: (data: any) => void - subblockUpdate?: (data: any) => void - variableUpdate?: (data: any) => void - cursorUpdate?: (data: any) => void - selectionUpdate?: (data: any) => void - workflowDeleted?: (data: any) => void - workflowReverted?: (data: any) => void - workflowUpdated?: (data: any) => void - workflowDeployed?: (data: any) => void - operationConfirmed?: (data: any) => void - operationFailed?: (data: any) => void + workflowOperation?: (data: WorkflowOperationBroadcast) => void + subblockUpdate?: (data: SubblockUpdateBroadcast) => void + variableUpdate?: (data: VariableUpdateBroadcast) => void + cursorUpdate?: (data: CursorUpdateBroadcast) => void + selectionUpdate?: (data: SelectionUpdateBroadcast) => void + workflowDeleted?: (data: WorkflowDeletedBroadcast) => void + workflowReverted?: (data: WorkflowRevertedBroadcast) => void + workflowUpdated?: (data: WorkflowUpdatedBroadcast) => void + workflowDeployed?: (data: WorkflowDeployedBroadcast) => void + operationConfirmed?: (data: OperationConfirmedBroadcast) => void + operationFailed?: (data: OperationFailedBroadcast) => void }>({}) const positionUpdateTimeouts = useRef>(new Map()) @@ -555,19 +568,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) { executeJoinCommands(result.commands) }) - socketInstance.on('workflow-operation', (data) => { + socketInstance.on('workflow-operation', (data: WorkflowOperationBroadcast) => { eventHandlers.current.workflowOperation?.(data) }) - socketInstance.on('subblock-update', (data) => { + socketInstance.on('subblock-update', (data: SubblockUpdateBroadcast) => { eventHandlers.current.subblockUpdate?.(data) }) - socketInstance.on('variable-update', (data) => { + socketInstance.on('variable-update', (data: VariableUpdateBroadcast) => { eventHandlers.current.variableUpdate?.(data) }) - socketInstance.on('workflow-deleted', (data) => { + socketInstance.on('workflow-deleted', (data: WorkflowDeletedBroadcast) => { logger.warn(`Workflow ${data.workflowId} has been deleted`) const result = joinControllerRef.current.handleWorkflowDeleted(data.workflowId) if (result.shouldClearCurrent) { @@ -577,17 +590,17 @@ export function SocketProvider({ children, user }: SocketProviderProps) { eventHandlers.current.workflowDeleted?.(data) }) - socketInstance.on('workflow-reverted', (data) => { + socketInstance.on('workflow-reverted', (data: WorkflowRevertedBroadcast) => { logger.info(`Workflow ${data.workflowId} has been reverted to deployed state`) eventHandlers.current.workflowReverted?.(data) }) - socketInstance.on('workflow-updated', (data) => { + socketInstance.on('workflow-updated', (data: WorkflowUpdatedBroadcast) => { logger.info(`Workflow ${data.workflowId} has been updated externally`) eventHandlers.current.workflowUpdated?.(data) }) - socketInstance.on('workflow-deployed', (data) => { + socketInstance.on('workflow-deployed', (data: WorkflowDeployedBroadcast) => { logger.info(`Workflow ${data.workflowId} deployment state changed`) eventHandlers.current.workflowDeployed?.(data) }) @@ -647,17 +660,17 @@ export function SocketProvider({ children, user }: SocketProviderProps) { return true } - socketInstance.on('operation-confirmed', (data) => { + socketInstance.on('operation-confirmed', (data: OperationConfirmedBroadcast) => { logger.debug('Operation confirmed', { operationId: data.operationId }) eventHandlers.current.operationConfirmed?.(data) }) - socketInstance.on('operation-failed', (data) => { + socketInstance.on('operation-failed', (data: OperationFailedBroadcast) => { logger.warn('Operation failed', { operationId: data.operationId, error: data.error }) eventHandlers.current.operationFailed?.(data) }) - socketInstance.on('cursor-update', (data) => { + socketInstance.on('cursor-update', (data: CursorUpdateBroadcast) => { if (!isWorkflowVisible()) { return } @@ -675,7 +688,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { eventHandlers.current.cursorUpdate?.(data) }) - socketInstance.on('selection-update', (data) => { + socketInstance.on('selection-update', (data: SelectionUpdateBroadcast) => { if (!isWorkflowVisible()) { return } @@ -1045,47 +1058,50 @@ export function SocketProvider({ children, user }: SocketProviderProps) { [socket, currentWorkflowId, isWorkflowVisible] ) - const onWorkflowOperation = useCallback((handler: (data: any) => void) => { + const onWorkflowOperation = useCallback((handler: (data: WorkflowOperationBroadcast) => void) => { eventHandlers.current.workflowOperation = handler }, []) - const onSubblockUpdate = useCallback((handler: (data: any) => void) => { + const onSubblockUpdate = useCallback((handler: (data: SubblockUpdateBroadcast) => void) => { eventHandlers.current.subblockUpdate = handler }, []) - const onVariableUpdate = useCallback((handler: (data: any) => void) => { + const onVariableUpdate = useCallback((handler: (data: VariableUpdateBroadcast) => void) => { eventHandlers.current.variableUpdate = handler }, []) - const onCursorUpdate = useCallback((handler: (data: any) => void) => { + const onCursorUpdate = useCallback((handler: (data: CursorUpdateBroadcast) => void) => { eventHandlers.current.cursorUpdate = handler }, []) - const onSelectionUpdate = useCallback((handler: (data: any) => void) => { + const onSelectionUpdate = useCallback((handler: (data: SelectionUpdateBroadcast) => void) => { eventHandlers.current.selectionUpdate = handler }, []) - const onWorkflowDeleted = useCallback((handler: (data: any) => void) => { + const onWorkflowDeleted = useCallback((handler: (data: WorkflowDeletedBroadcast) => void) => { eventHandlers.current.workflowDeleted = handler }, []) - const onWorkflowReverted = useCallback((handler: (data: any) => void) => { + const onWorkflowReverted = useCallback((handler: (data: WorkflowRevertedBroadcast) => void) => { eventHandlers.current.workflowReverted = handler }, []) - const onWorkflowUpdated = useCallback((handler: (data: any) => void) => { + const onWorkflowUpdated = useCallback((handler: (data: WorkflowUpdatedBroadcast) => void) => { eventHandlers.current.workflowUpdated = handler }, []) - const onWorkflowDeployed = useCallback((handler: (data: any) => void) => { + const onWorkflowDeployed = useCallback((handler: (data: WorkflowDeployedBroadcast) => void) => { eventHandlers.current.workflowDeployed = handler }, []) - const onOperationConfirmed = useCallback((handler: (data: any) => void) => { - eventHandlers.current.operationConfirmed = handler - }, []) + const onOperationConfirmed = useCallback( + (handler: (data: OperationConfirmedBroadcast) => void) => { + eventHandlers.current.operationConfirmed = handler + }, + [] + ) - const onOperationFailed = useCallback((handler: (data: any) => void) => { + const onOperationFailed = useCallback((handler: (data: OperationFailedBroadcast) => void) => { eventHandlers.current.operationFailed = handler }, []) diff --git a/packages/realtime-protocol/package.json b/packages/realtime-protocol/package.json index 1bf2411e848..4f026e9d2ac 100644 --- a/packages/realtime-protocol/package.json +++ b/packages/realtime-protocol/package.json @@ -14,6 +14,10 @@ "types": "./src/constants.ts", "default": "./src/constants.ts" }, + "./events": { + "types": "./src/events.ts", + "default": "./src/events.ts" + }, "./schemas": { "types": "./src/schemas.ts", "default": "./src/schemas.ts" diff --git a/packages/realtime-protocol/src/events.ts b/packages/realtime-protocol/src/events.ts new file mode 100644 index 00000000000..9f7d86695da --- /dev/null +++ b/packages/realtime-protocol/src/events.ts @@ -0,0 +1,121 @@ +import type { OperationTarget, SocketOperation } from './constants' + +/** + * Wire types for the broadcast/confirmation events the realtime Socket.IO server + * emits to clients. These mirror the exact object literals emitted by + * `apps/realtime/src/handlers/**` and `apps/realtime/src/rooms/**`, and are the + * canonical types consumed by the client socket transport + * (`apps/sim/app/workspace/providers/socket-provider.tsx`). + * + * Payload bodies that the transport forwards opaquely are typed `unknown` rather + * than a concrete operation union, because the transport never narrows them — the + * collaborative-workflow consumer dispatches on `operation`/`target` itself. + */ + +/** A live-presence cursor position broadcast over the socket. */ +export interface CursorPosition { + x: number + y: number +} + +/** A live-presence selection broadcast over the socket. */ +export interface PresenceSelection { + type: 'block' | 'edge' | 'none' + id?: string +} + +/** + * `workflow-operation` broadcast. The server re-broadcasts the originating + * operation envelope plus sender identity and operation metadata. + */ +export interface WorkflowOperationBroadcast { + operation: SocketOperation | string + target: OperationTarget | string + payload: unknown + timestamp: number + senderId: string + userId: string + userName: string + metadata: { + workflowId: string + operationId: string + isPositionUpdate?: boolean + isBatchPositionUpdate?: boolean + } +} + +/** `subblock-update` broadcast. */ +export interface SubblockUpdateBroadcast { + workflowId: string + blockId: string + subblockId: string + value: unknown + timestamp: number +} + +/** `variable-update` broadcast. */ +export interface VariableUpdateBroadcast { + workflowId: string + variableId: string + field: string + value: unknown + timestamp: number +} + +/** `cursor-update` presence broadcast for a single remote user. */ +export interface CursorUpdateBroadcast { + socketId: string + userId: string + userName: string + avatarUrl?: string | null + cursor: CursorPosition +} + +/** `selection-update` presence broadcast for a single remote user. */ +export interface SelectionUpdateBroadcast { + socketId: string + userId: string + userName: string + avatarUrl?: string | null + selection: PresenceSelection +} + +/** `workflow-deleted` lifecycle broadcast. */ +export interface WorkflowDeletedBroadcast { + workflowId: string + message: string + timestamp: number +} + +/** `workflow-reverted` lifecycle broadcast. */ +export interface WorkflowRevertedBroadcast { + workflowId: string + message: string + timestamp: number +} + +/** `workflow-updated` lifecycle broadcast. */ +export interface WorkflowUpdatedBroadcast { + workflowId: string + message: string + timestamp: number +} + +/** `workflow-deployed` lifecycle broadcast. */ +export interface WorkflowDeployedBroadcast { + workflowId: string + timestamp: number +} + +/** `operation-confirmed` ack for a previously-emitted operation. */ +export interface OperationConfirmedBroadcast { + operationId: string + serverTimestamp: number +} + +/** `operation-failed` rejection for a previously-emitted operation. */ +export interface OperationFailedBroadcast { + operationId: string + error: string + retryable?: boolean +} From 881caa3b04629cb6fe99402310c94d4284a9471c Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 24 Jun 2026 20:36:48 -0700 Subject: [PATCH 2/2] fix(realtime): type cursor-update broadcast cursor as nullable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The client emits 'cursor-update' with { cursor: null } when a remote user's cursor leaves the canvas, and the server re-broadcasts it verbatim, so receivers genuinely get cursor: null. Type CursorUpdateBroadcast.cursor as CursorPosition | null to match the wire. (selection stays non-null — it signals absence via type: 'none', never null.) --- packages/realtime-protocol/src/events.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/realtime-protocol/src/events.ts b/packages/realtime-protocol/src/events.ts index 9f7d86695da..95f630de80e 100644 --- a/packages/realtime-protocol/src/events.ts +++ b/packages/realtime-protocol/src/events.ts @@ -68,7 +68,8 @@ export interface CursorUpdateBroadcast { userId: string userName: string avatarUrl?: string | null - cursor: CursorPosition + /** `null` when the remote user's cursor leaves the canvas (the client emits `{ cursor: null }`). */ + cursor: CursorPosition | null } /** `selection-update` presence broadcast for a single remote user. */