Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 63 additions & 47 deletions apps/sim/app/workspace/providers/socket-provider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ import {
useState,
} from 'react'
import { createLogger } from '@sim/logger'
import type {
CursorUpdateBroadcast,
OperationConfirmedBroadcast,
OperationFailedBroadcast,
SelectionUpdateBroadcast,
SubblockUpdateBroadcast,
VariableUpdateBroadcast,
WorkflowDeletedBroadcast,
WorkflowDeployedBroadcast,
WorkflowOperationBroadcast,
WorkflowRevertedBroadcast,
WorkflowUpdatedBroadcast,
} from '@sim/realtime-protocol/events'
import { generateId } from '@sim/utils/id'
import { backoffWithJitter } from '@sim/utils/retry'
import { useParams } from 'next/navigation'
Expand Down Expand Up @@ -92,18 +105,18 @@ interface SocketContextType {

emitCursorUpdate: (cursor: { x: number; y: number } | null) => void
emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void
onWorkflowOperation: (handler: (data: any) => void) => void
onSubblockUpdate: (handler: (data: any) => void) => void
onVariableUpdate: (handler: (data: any) => void) => void

onCursorUpdate: (handler: (data: any) => void) => void
onSelectionUpdate: (handler: (data: any) => void) => void
onWorkflowDeleted: (handler: (data: any) => void) => void
onWorkflowReverted: (handler: (data: any) => void) => void
onWorkflowUpdated: (handler: (data: any) => void) => void
onWorkflowDeployed: (handler: (data: any) => void) => void
onOperationConfirmed: (handler: (data: any) => void) => void
onOperationFailed: (handler: (data: any) => void) => void
onWorkflowOperation: (handler: (data: WorkflowOperationBroadcast) => void) => void
onSubblockUpdate: (handler: (data: SubblockUpdateBroadcast) => void) => void
onVariableUpdate: (handler: (data: VariableUpdateBroadcast) => void) => void

onCursorUpdate: (handler: (data: CursorUpdateBroadcast) => void) => void
onSelectionUpdate: (handler: (data: SelectionUpdateBroadcast) => void) => void
onWorkflowDeleted: (handler: (data: WorkflowDeletedBroadcast) => void) => void
onWorkflowReverted: (handler: (data: WorkflowRevertedBroadcast) => void) => void
onWorkflowUpdated: (handler: (data: WorkflowUpdatedBroadcast) => void) => void
onWorkflowDeployed: (handler: (data: WorkflowDeployedBroadcast) => void) => void
onOperationConfirmed: (handler: (data: OperationConfirmedBroadcast) => void) => void
onOperationFailed: (handler: (data: OperationFailedBroadcast) => void) => void
}

const SocketContext = createContext<SocketContextType>({
Expand Down Expand Up @@ -173,17 +186,17 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
explicitWorkflowIdRef.current = explicitWorkflowId

const eventHandlers = useRef<{
workflowOperation?: (data: any) => void
subblockUpdate?: (data: any) => void
variableUpdate?: (data: any) => void
cursorUpdate?: (data: any) => void
selectionUpdate?: (data: any) => void
workflowDeleted?: (data: any) => void
workflowReverted?: (data: any) => void
workflowUpdated?: (data: any) => void
workflowDeployed?: (data: any) => void
operationConfirmed?: (data: any) => void
operationFailed?: (data: any) => void
workflowOperation?: (data: WorkflowOperationBroadcast) => void
subblockUpdate?: (data: SubblockUpdateBroadcast) => void
variableUpdate?: (data: VariableUpdateBroadcast) => void
cursorUpdate?: (data: CursorUpdateBroadcast) => void
selectionUpdate?: (data: SelectionUpdateBroadcast) => void
workflowDeleted?: (data: WorkflowDeletedBroadcast) => void
workflowReverted?: (data: WorkflowRevertedBroadcast) => void
workflowUpdated?: (data: WorkflowUpdatedBroadcast) => void
workflowDeployed?: (data: WorkflowDeployedBroadcast) => void
operationConfirmed?: (data: OperationConfirmedBroadcast) => void
operationFailed?: (data: OperationFailedBroadcast) => void
}>({})

const positionUpdateTimeouts = useRef<Map<string, number>>(new Map())
Expand Down Expand Up @@ -555,19 +568,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
executeJoinCommands(result.commands)
})

socketInstance.on('workflow-operation', (data) => {
socketInstance.on('workflow-operation', (data: WorkflowOperationBroadcast) => {
eventHandlers.current.workflowOperation?.(data)
})

socketInstance.on('subblock-update', (data) => {
socketInstance.on('subblock-update', (data: SubblockUpdateBroadcast) => {
eventHandlers.current.subblockUpdate?.(data)
})

socketInstance.on('variable-update', (data) => {
socketInstance.on('variable-update', (data: VariableUpdateBroadcast) => {
eventHandlers.current.variableUpdate?.(data)
})

socketInstance.on('workflow-deleted', (data) => {
socketInstance.on('workflow-deleted', (data: WorkflowDeletedBroadcast) => {
logger.warn(`Workflow ${data.workflowId} has been deleted`)
const result = joinControllerRef.current.handleWorkflowDeleted(data.workflowId)
if (result.shouldClearCurrent) {
Expand All @@ -577,17 +590,17 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.workflowDeleted?.(data)
})

socketInstance.on('workflow-reverted', (data) => {
socketInstance.on('workflow-reverted', (data: WorkflowRevertedBroadcast) => {
logger.info(`Workflow ${data.workflowId} has been reverted to deployed state`)
eventHandlers.current.workflowReverted?.(data)
})

socketInstance.on('workflow-updated', (data) => {
socketInstance.on('workflow-updated', (data: WorkflowUpdatedBroadcast) => {
logger.info(`Workflow ${data.workflowId} has been updated externally`)
eventHandlers.current.workflowUpdated?.(data)
})

socketInstance.on('workflow-deployed', (data) => {
socketInstance.on('workflow-deployed', (data: WorkflowDeployedBroadcast) => {
logger.info(`Workflow ${data.workflowId} deployment state changed`)
eventHandlers.current.workflowDeployed?.(data)
})
Expand Down Expand Up @@ -647,17 +660,17 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
return true
}

socketInstance.on('operation-confirmed', (data) => {
socketInstance.on('operation-confirmed', (data: OperationConfirmedBroadcast) => {
logger.debug('Operation confirmed', { operationId: data.operationId })
eventHandlers.current.operationConfirmed?.(data)
})

socketInstance.on('operation-failed', (data) => {
socketInstance.on('operation-failed', (data: OperationFailedBroadcast) => {
logger.warn('Operation failed', { operationId: data.operationId, error: data.error })
eventHandlers.current.operationFailed?.(data)
})

socketInstance.on('cursor-update', (data) => {
socketInstance.on('cursor-update', (data: CursorUpdateBroadcast) => {
if (!isWorkflowVisible()) {
return
}
Expand All @@ -675,7 +688,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.cursorUpdate?.(data)
})

socketInstance.on('selection-update', (data) => {
socketInstance.on('selection-update', (data: SelectionUpdateBroadcast) => {
if (!isWorkflowVisible()) {
return
}
Expand Down Expand Up @@ -1045,47 +1058,50 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
[socket, currentWorkflowId, isWorkflowVisible]
)

const onWorkflowOperation = useCallback((handler: (data: any) => void) => {
const onWorkflowOperation = useCallback((handler: (data: WorkflowOperationBroadcast) => void) => {
eventHandlers.current.workflowOperation = handler
}, [])

const onSubblockUpdate = useCallback((handler: (data: any) => void) => {
const onSubblockUpdate = useCallback((handler: (data: SubblockUpdateBroadcast) => void) => {
eventHandlers.current.subblockUpdate = handler
}, [])

const onVariableUpdate = useCallback((handler: (data: any) => void) => {
const onVariableUpdate = useCallback((handler: (data: VariableUpdateBroadcast) => void) => {
eventHandlers.current.variableUpdate = handler
}, [])

const onCursorUpdate = useCallback((handler: (data: any) => void) => {
const onCursorUpdate = useCallback((handler: (data: CursorUpdateBroadcast) => void) => {
eventHandlers.current.cursorUpdate = handler
}, [])

const onSelectionUpdate = useCallback((handler: (data: any) => void) => {
const onSelectionUpdate = useCallback((handler: (data: SelectionUpdateBroadcast) => void) => {
eventHandlers.current.selectionUpdate = handler
}, [])

const onWorkflowDeleted = useCallback((handler: (data: any) => void) => {
const onWorkflowDeleted = useCallback((handler: (data: WorkflowDeletedBroadcast) => void) => {
eventHandlers.current.workflowDeleted = handler
}, [])

const onWorkflowReverted = useCallback((handler: (data: any) => void) => {
const onWorkflowReverted = useCallback((handler: (data: WorkflowRevertedBroadcast) => void) => {
eventHandlers.current.workflowReverted = handler
}, [])

const onWorkflowUpdated = useCallback((handler: (data: any) => void) => {
const onWorkflowUpdated = useCallback((handler: (data: WorkflowUpdatedBroadcast) => void) => {
eventHandlers.current.workflowUpdated = handler
}, [])

const onWorkflowDeployed = useCallback((handler: (data: any) => void) => {
const onWorkflowDeployed = useCallback((handler: (data: WorkflowDeployedBroadcast) => void) => {
eventHandlers.current.workflowDeployed = handler
}, [])

const onOperationConfirmed = useCallback((handler: (data: any) => void) => {
eventHandlers.current.operationConfirmed = handler
}, [])
const onOperationConfirmed = useCallback(
(handler: (data: OperationConfirmedBroadcast) => void) => {
eventHandlers.current.operationConfirmed = handler
},
[]
)

const onOperationFailed = useCallback((handler: (data: any) => void) => {
const onOperationFailed = useCallback((handler: (data: OperationFailedBroadcast) => void) => {
eventHandlers.current.operationFailed = handler
}, [])

Expand Down
4 changes: 4 additions & 0 deletions packages/realtime-protocol/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
"types": "./src/constants.ts",
"default": "./src/constants.ts"
},
"./events": {
"types": "./src/events.ts",
"default": "./src/events.ts"
},
"./schemas": {
"types": "./src/schemas.ts",
"default": "./src/schemas.ts"
Expand Down
122 changes: 122 additions & 0 deletions packages/realtime-protocol/src/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import type { OperationTarget, SocketOperation } from './constants'

/**
* Wire types for the broadcast/confirmation events the realtime Socket.IO server
* emits to clients. These mirror the exact object literals emitted by
* `apps/realtime/src/handlers/**` and `apps/realtime/src/rooms/**`, and are the
* canonical types consumed by the client socket transport
* (`apps/sim/app/workspace/providers/socket-provider.tsx`).
*
* Payload bodies that the transport forwards opaquely are typed `unknown` rather
* than a concrete operation union, because the transport never narrows them — the
* collaborative-workflow consumer dispatches on `operation`/`target` itself.
*/

/** A live-presence cursor position broadcast over the socket. */
export interface CursorPosition {
x: number
y: number
}

/** A live-presence selection broadcast over the socket. */
export interface PresenceSelection {
type: 'block' | 'edge' | 'none'
id?: string
}

/**
* `workflow-operation` broadcast. The server re-broadcasts the originating
* operation envelope plus sender identity and operation metadata.
*/
export interface WorkflowOperationBroadcast {
operation: SocketOperation | string
target: OperationTarget | string
payload: unknown
timestamp: number
senderId: string
userId: string
userName: string
metadata: {
workflowId: string
operationId: string
isPositionUpdate?: boolean
isBatchPositionUpdate?: boolean
}
}

/** `subblock-update` broadcast. */
export interface SubblockUpdateBroadcast {
workflowId: string
blockId: string
subblockId: string
value: unknown
timestamp: number
}

/** `variable-update` broadcast. */
export interface VariableUpdateBroadcast {
workflowId: string
variableId: string
field: string
value: unknown
timestamp: number
}

/** `cursor-update` presence broadcast for a single remote user. */
export interface CursorUpdateBroadcast {
socketId: string
userId: string
userName: string
avatarUrl?: string | null
/** `null` when the remote user's cursor leaves the canvas (the client emits `{ cursor: null }`). */
cursor: CursorPosition | null
}
Comment thread
waleedlatif1 marked this conversation as resolved.

/** `selection-update` presence broadcast for a single remote user. */
export interface SelectionUpdateBroadcast {
socketId: string
userId: string
userName: string
avatarUrl?: string | null
selection: PresenceSelection
}

/** `workflow-deleted` lifecycle broadcast. */
export interface WorkflowDeletedBroadcast {
workflowId: string
message: string
timestamp: number
}

/** `workflow-reverted` lifecycle broadcast. */
export interface WorkflowRevertedBroadcast {
workflowId: string
message: string
timestamp: number
}

/** `workflow-updated` lifecycle broadcast. */
export interface WorkflowUpdatedBroadcast {
workflowId: string
message: string
timestamp: number
}

/** `workflow-deployed` lifecycle broadcast. */
export interface WorkflowDeployedBroadcast {
workflowId: string
timestamp: number
}

/** `operation-confirmed` ack for a previously-emitted operation. */
export interface OperationConfirmedBroadcast {
operationId: string
serverTimestamp: number
}

/** `operation-failed` rejection for a previously-emitted operation. */
export interface OperationFailedBroadcast {
operationId: string
error: string
retryable?: boolean
}
Loading