diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 896fdc076..31371e02e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -120,6 +120,12 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) { .setRequestId(UUID.randomUUID().toString()) .setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null)); + // If this signal is being issued from inside a Nexus operation handler, forward the inbound + // Nexus task links so the SignalWorkflowExecution history event links back to the caller. + if (CurrentNexusOperationContext.isNexusContext()) { + request.addAllLinks(CurrentNexusOperationContext.get().getNexusOperationLinks()); + } + DataConverter dataConverterWitSignalContext = clientOptions .getDataConverter() @@ -129,7 +135,12 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) { Optional inputArgs = dataConverterWitSignalContext.toPayloads(input.getArguments()); inputArgs.ifPresent(request::setInput); - genericClient.signal(request.build()); + SignalWorkflowExecutionResponse response = genericClient.signal(request.build()); + // Server >=1.31 with EnableCHASMSignalBacklinks returns a backlink pointing at the signal + // event; older servers leave it unset. Propagate when present. + if (CurrentNexusOperationContext.isNexusContext() && response.hasLink()) { + CurrentNexusOperationContext.get().addSignalWorkflowResponseLink(response.getLink()); + } return new WorkflowSignalOutput(); } @@ -148,17 +159,27 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu Optional signalInput = dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments()); - SignalWithStartWorkflowExecutionRequest request = - requestsHelper - .newSignalWithStartWorkflowExecutionRequest( - startRequest, input.getSignalName(), signalInput.orElse(null)) - .build(); + SignalWithStartWorkflowExecutionRequest.Builder requestBuilder = + requestsHelper.newSignalWithStartWorkflowExecutionRequest( + startRequest, input.getSignalName(), signalInput.orElse(null)); + // If this signalWithStart is being issued from inside a Nexus operation handler, forward + // the inbound Nexus task links so both the WorkflowExecutionStarted and + // WorkflowExecutionSignaled events on the callee link back to the caller. + if (CurrentNexusOperationContext.isNexusContext()) { + requestBuilder.addAllLinks(CurrentNexusOperationContext.get().getNexusOperationLinks()); + } + SignalWithStartWorkflowExecutionRequest request = requestBuilder.build(); SignalWithStartWorkflowExecutionResponse response = genericClient.signalWithStart(request); WorkflowExecution execution = WorkflowExecution.newBuilder() .setRunId(response.getRunId()) .setWorkflowId(request.getWorkflowId()) .build(); + // Server >=1.31 with EnableCHASMSignalBacklinks returns a backlink pointing at the signal + // event; older servers leave it unset. Propagate when present. + if (CurrentNexusOperationContext.isNexusContext() && response.hasSignalLink()) { + CurrentNexusOperationContext.get().addSignalWorkflowResponseLink(response.getSignalLink()); + } // TODO currently SignalWithStartWorkflowExecutionResponse doesn't have eagerWorkflowTask. // We should wire it when it's implemented server-side. return new WorkflowSignalWithStartOutput(new WorkflowStartOutput(execution)); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java index 317c2300b..e4e313f57 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClient.java @@ -10,7 +10,7 @@ public interface GenericWorkflowClient { StartWorkflowExecutionResponse start(StartWorkflowExecutionRequest request); - void signal(SignalWorkflowExecutionRequest request); + SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request); SignalWithStartWorkflowExecutionResponse signalWithStart( SignalWithStartWorkflowExecutionRequest request); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java index 58ad1e8f1..2b8e31c31 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/external/GenericWorkflowClientImpl.java @@ -61,13 +61,13 @@ private static Map tagsForStartWorkflow(StartWorkflowExecutionRe } @Override - public void signal(SignalWorkflowExecutionRequest request) { + public SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request) { Map tags = new ImmutableMap.Builder(1) .put(MetricsTag.SIGNAL_NAME, request.getSignalName()) .build(); Scope scope = metricsScope.tagged(tags); - grpcRetryer.retry( + return grpcRetryer.retryWithResult( () -> service .blockingStub() diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java index d7306ea96..4d21366fb 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java @@ -6,6 +6,9 @@ import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor; import io.temporal.nexus.NexusOperationContext; import io.temporal.nexus.NexusOperationInfo; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; public class InternalNexusOperationContext { private final String namespace; @@ -15,6 +18,21 @@ public class InternalNexusOperationContext { private final WorkflowClient client; NexusOperationOutboundCallsInterceptor outboundCalls; Link startWorkflowResponseLink; + // Links extracted from the inbound Nexus task. Stored once at the task-handler boundary so the + // workflow client (signal, signalWithStart) can attach them to outgoing requests via + // SignalWorkflowExecutionRequest.links, matching the Go SDK's NexusOperationLinksKey ctx value. + private List nexusOperationLinks = Collections.emptyList(); + // Backlinks returned by SignalWorkflowExecutionResponse.link / + // SignalWithStartWorkflowExecutionResponse.signal_link. One entry per signal RPC issued from + // within the Nexus operation handler. Drained by the task handler when building + // StartOperationResponse so every signal the handler issues gets a corresponding link on the + // caller workflow's history event. + // + // NOTE: this context is only safe for use from the single thread that runs the operation + // handler (the Nexus task executor's thread). Handlers that spawn their own threads to issue + // signals will not see the thread-local context, so the links from those signals will not + // propagate. + private final List signalWorkflowResponseLinks = new ArrayList<>(); public InternalNexusOperationContext( String namespace, @@ -68,6 +86,35 @@ public Link getStartWorkflowResponseLink() { return startWorkflowResponseLink; } + /** + * Set the {@code common.v1.Link}s extracted from the inbound Nexus task so they can be attached + * to any signal RPCs issued by the operation handler. + */ + public void setNexusOperationLinks(List links) { + this.nexusOperationLinks = links == null ? Collections.emptyList() : links; + } + + /** Links from the inbound Nexus task; empty if none. Never null. */ + public List getNexusOperationLinks() { + return nexusOperationLinks; + } + + /** + * Append a backlink returned by a signal-class RPC (signal or signalWithStart). Each signal the + * operation handler issues should add one entry; the task handler drains the list when building + * the operation's StartOperationResponse. + */ + public void addSignalWorkflowResponseLink(Link link) { + if (link != null) { + this.signalWorkflowResponseLinks.add(link); + } + } + + /** Backlinks from every signal RPC issued by the handler. Never null; may be empty. */ + public List getSignalWorkflowResponseLinks() { + return signalWorkflowResponseLinks; + } + private class NexusOperationContextImpl implements NexusOperationContext { @Override public NexusOperationInfo getInfo() { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index 7f10ba8c6..d601444b5 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -20,6 +20,7 @@ import io.temporal.failure.CanceledFailure; import io.temporal.failure.TemporalFailure; import io.temporal.internal.common.InternalUtils; +import io.temporal.internal.common.LinkConverter; import io.temporal.internal.common.NexusUtil; import io.temporal.internal.worker.NexusTask; import io.temporal.internal.worker.NexusTaskHandler; @@ -284,6 +285,10 @@ private StartOperationResponse handleStartOperation( .setCallbackUrl(task.getCallback()) .setRequestId(task.getRequestId()); task.getCallbackHeaderMap().forEach(operationStartDetails::putCallbackHeader); + // Stash the inbound links in common.v1.Link form on the operation context so that signal + // RPCs issued by the handler (e.g. SignalWithStartWorkflow on the callee) can attach them + // to SignalWorkflowExecutionRequest.links. + List inboundCommonLinks = new ArrayList<>(); task.getLinksList() .forEach( link -> { @@ -296,7 +301,23 @@ private StartOperationResponse handleStartOperation( "Invalid link URL: " + link.getUrl(), e); } + // LinkConverter only returns a WorkflowEvent-shaped common.v1.Link; nexus links of + // other shapes (e.g. non-temporal URLs) come back null and are intentionally not + // forwarded onto SignalWorkflowExecutionRequest.links, which requires the + // WorkflowEvent variant. Log so a debugging session can see what was dropped. + io.temporal.api.common.v1.Link commonLink = + LinkConverter.nexusLinkToWorkflowEvent(link); + if (commonLink != null) { + inboundCommonLinks.add(commonLink); + } else { + log.warn( + "Dropping inbound Nexus link from outbound signal propagation: type='{}'," + + " url='{}' (not a parseable temporal WorkflowEvent link)", + link.getType(), + link.getUrl()); + } }); + CurrentNexusOperationContext.get().setNexusOperationLinks(inboundCommonLinks); HandlerInputContent.Builder input = HandlerInputContent.newBuilder().setDataStream(task.getPayload().toByteString().newInput()); @@ -307,10 +328,27 @@ private StartOperationResponse handleStartOperation( try { OperationStartResult result = startOperation(context, operationStartDetails.build(), input.build()); + // If signal/signalWithStart RPCs issued by the handler returned backlinks, propagate + // them to the caller so the caller workflow's history event links to each signal event + // on the callee. Same set of backlinks applies to both sync and async response variants. + List signalBacklinks = new ArrayList<>(); + for (io.temporal.api.common.v1.Link signalResponseLink : + CurrentNexusOperationContext.get().getSignalWorkflowResponseLinks()) { + if (!signalResponseLink.hasWorkflowEvent()) { + continue; + } + io.temporal.api.nexus.v1.Link converted = + LinkConverter.workflowEventToNexusLink(signalResponseLink.getWorkflowEvent()); + if (converted != null) { + signalBacklinks.add(converted); + } + } + if (result.isSync()) { startResponseBuilder.setSyncSuccess( StartOperationResponse.Sync.newBuilder() .setPayload(Payload.parseFrom(result.getSyncResult().getDataBytes())) + .addAllLinks(signalBacklinks) .build()); } else { startResponseBuilder.setAsyncSuccess( @@ -326,6 +364,7 @@ private StartOperationResponse handleStartOperation( .setUrl(link.getUri().toString()) .build()) .collect(Collectors.toList())) + .addAllLinks(signalBacklinks) .build()); } } catch (OperationException e) { diff --git a/temporal-sdk/src/test/java/io/temporal/internal/client/RootWorkflowClientInvokerLinkPropagationTest.java b/temporal-sdk/src/test/java/io/temporal/internal/client/RootWorkflowClientInvokerLinkPropagationTest.java new file mode 100644 index 000000000..cef0dbf8b --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/client/RootWorkflowClientInvokerLinkPropagationTest.java @@ -0,0 +1,177 @@ +package io.temporal.internal.client; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.uber.m3.tally.RootScopeBuilder; +import com.uber.m3.tally.Scope; +import io.temporal.api.common.v1.Link; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.workflowservice.v1.SignalWorkflowExecutionRequest; +import io.temporal.api.workflowservice.v1.SignalWorkflowExecutionResponse; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.common.interceptors.Header; +import io.temporal.common.interceptors.WorkflowClientCallsInterceptor.WorkflowSignalInput; +import io.temporal.internal.client.external.GenericWorkflowClient; +import io.temporal.internal.nexus.CurrentNexusOperationContext; +import io.temporal.internal.nexus.InternalNexusOperationContext; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +/** + * Unit tests for {@link RootWorkflowClientInvoker#signal} link propagation in and out of the Nexus + * operation context. These run against mocked dependencies and exercise the code paths that the + * integration tests in {@code SignalOperationLinkingTest} can only cover when a real flag-enabled + * server is available. + */ +public class RootWorkflowClientInvokerLinkPropagationTest { + + private static final String NAMESPACE = "test-namespace"; + private static final String WORKFLOW_ID = "wf-target"; + + private GenericWorkflowClient genericClient; + private RootWorkflowClientInvoker invoker; + private InternalNexusOperationContext nexusCtx; + + @Before + public void setUp() { + genericClient = mock(GenericWorkflowClient.class); + invoker = + new RootWorkflowClientInvoker( + genericClient, + WorkflowClientOptions.newBuilder() + .setNamespace(NAMESPACE) + .validateAndBuildWithDefaults(), + new WorkerFactoryRegistry()); + Scope metricsScope = new RootScopeBuilder().reportEvery(com.uber.m3.util.Duration.ofMillis(10)); + nexusCtx = + new InternalNexusOperationContext( + NAMESPACE, "tq", "endpoint", metricsScope, mock(WorkflowClient.class)); + CurrentNexusOperationContext.set(nexusCtx); + } + + @After + public void tearDown() { + CurrentNexusOperationContext.unset(); + } + + /** + * Happy path against a flag-enabled server: inbound nexus links are forwarded onto the + * SignalWorkflowExecutionRequest, and the response's backlink is captured back onto the operation + * context. + */ + @Test + public void signalForwardsInboundLinksAndCapturesResponseBacklink() { + Link inboundLink = + workflowEventLink( + "caller-wf", "caller-run", EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); + nexusCtx.setNexusOperationLinks(Collections.singletonList(inboundLink)); + + Link responseLink = + workflowEventLink( + WORKFLOW_ID, "target-run", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); + SignalWorkflowExecutionResponse response = + SignalWorkflowExecutionResponse.newBuilder().setLink(responseLink).build(); + when(genericClient.signal(any(SignalWorkflowExecutionRequest.class))).thenReturn(response); + + invoker.signal(newSignalInput()); + + // Forward direction: the request the SDK sent carries the inbound link. + ArgumentCaptor captor = + ArgumentCaptor.forClass(SignalWorkflowExecutionRequest.class); + org.mockito.Mockito.verify(genericClient).signal(captor.capture()); + SignalWorkflowExecutionRequest sent = captor.getValue(); + Assert.assertEquals("request should carry the single inbound link", 1, sent.getLinksCount()); + Assert.assertEquals(inboundLink, sent.getLinks(0)); + + // Backward direction: the response's link is now on the context for the task handler to read. + List captured = nexusCtx.getSignalWorkflowResponseLinks(); + Assert.assertEquals("expected one captured backlink", 1, captured.size()); + Assert.assertEquals(responseLink, captured.get(0)); + } + + /** + * Older-server compatibility: the server returns a response without {@code link} set. The SDK + * must not crash and must leave the operation context's backlink list empty. + */ + @Test + public void signalAgainstOlderServerCapturesNoBacklink() { + Link inboundLink = + workflowEventLink( + "caller-wf", "caller-run", EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); + nexusCtx.setNexusOperationLinks(Collections.singletonList(inboundLink)); + + // Pre-1.31 server / flag-off server: response has no link. + SignalWorkflowExecutionResponse response = SignalWorkflowExecutionResponse.getDefaultInstance(); + when(genericClient.signal(any(SignalWorkflowExecutionRequest.class))).thenReturn(response); + + invoker.signal(newSignalInput()); + + // Forward direction still works regardless of server version. + ArgumentCaptor captor = + ArgumentCaptor.forClass(SignalWorkflowExecutionRequest.class); + org.mockito.Mockito.verify(genericClient).signal(captor.capture()); + Assert.assertEquals(1, captor.getValue().getLinksCount()); + + // Backward direction: no backlink captured because the server didn't send one. + Assert.assertTrue( + "expected no captured backlink when server returned no link", + nexusCtx.getSignalWorkflowResponseLinks().isEmpty()); + } + + /** + * Multi-signal: two signal RPCs in a row each contribute a backlink; both must be captured in + * order on the context, ready for the task handler to drain into the operation response. + */ + @Test + public void multipleSignalsAccumulateAllBacklinks() { + Link firstResponseLink = + workflowEventLink("callee-a", "run-a", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); + Link secondResponseLink = + workflowEventLink("callee-b", "run-b", EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); + when(genericClient.signal(any(SignalWorkflowExecutionRequest.class))) + .thenReturn(SignalWorkflowExecutionResponse.newBuilder().setLink(firstResponseLink).build()) + .thenReturn( + SignalWorkflowExecutionResponse.newBuilder().setLink(secondResponseLink).build()); + + invoker.signal(newSignalInput()); + invoker.signal(newSignalInput()); + + List captured = nexusCtx.getSignalWorkflowResponseLinks(); + Assert.assertEquals( + "expected one backlink per signal call", + Arrays.asList(firstResponseLink, secondResponseLink), + captured); + } + + // ── helpers ────────────────────────────────────────────────────────────────────────────── + + private static WorkflowSignalInput newSignalInput() { + return new WorkflowSignalInput( + WorkflowExecution.newBuilder().setWorkflowId(WORKFLOW_ID).build(), + "test-signal", + Header.empty(), + new Object[] {"payload"}); + } + + private static Link workflowEventLink(String workflowId, String runId, EventType eventType) { + return Link.newBuilder() + .setWorkflowEvent( + Link.WorkflowEvent.newBuilder() + .setNamespace(NAMESPACE) + .setWorkflowId(workflowId) + .setRunId(runId) + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder().setEventType(eventType))) + .build(); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java b/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java index ad7c628e9..970a98b9d 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java @@ -8,9 +8,12 @@ import com.uber.m3.util.Duration; import io.nexusrpc.Header; import io.nexusrpc.handler.*; +import io.temporal.api.common.v1.Link; import io.temporal.api.common.v1.Payload; +import io.temporal.api.enums.v1.EventType; import io.temporal.api.nexus.v1.Request; import io.temporal.api.nexus.v1.StartOperationRequest; +import io.temporal.api.nexus.v1.StartOperationResponse; import io.temporal.api.workflowservice.v1.PollNexusTaskQueueResponse; import io.temporal.client.WorkflowClient; import io.temporal.common.converter.DataConverter; @@ -157,6 +160,83 @@ public void startAsyncSyncOperation() throws TimeoutException { "test id", result.getResponse().getStartOperation().getAsyncSuccess().getOperationToken()); } + /** + * Verify that signal-response backlinks stashed on the {@link InternalNexusOperationContext} + * during a handler invocation are merged into the resulting {@code StartOperationResponse.Async} + * via {@link io.temporal.internal.common.LinkConverter}. No server required. + */ + @Test + public void asyncResponseIncludesSignalBacklinks() throws TimeoutException { + WorkflowClient client = mock(WorkflowClient.class); + NexusTaskHandlerImpl nexusTaskHandlerImpl = + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); + nexusTaskHandlerImpl.registerNexusServiceImplementations( + new Object[] {new BacklinkStashingAsyncServiceImpl()}); + nexusTaskHandlerImpl.start(); + + PollNexusTaskQueueResponse.Builder task = + PollNexusTaskQueueResponse.newBuilder() + .setRequest( + Request.newBuilder() + .setStartOperation( + StartOperationRequest.newBuilder() + .setOperation("operation") + .setService("TestNexusService1") + .setPayload(dataConverter.toPayload("op-token").get()) + .build())); + + NexusTaskHandler.Result result = + nexusTaskHandlerImpl.handle(new NexusTask(task, null, null), metricsScope); + + Assert.assertNull(result.getHandlerException()); + StartOperationResponse.Async async = result.getResponse().getStartOperation().getAsyncSuccess(); + Assert.assertEquals("op-token", async.getOperationToken()); + Assert.assertEquals( + "expected one signal backlink on the async response", 1, async.getLinksCount()); + // The backlink was stashed as a WorkflowEvent for callee workflowId "callee-wf"; the response + // should contain a temporal:// URL referencing that workflow. + Assert.assertTrue( + "expected backlink URL to reference the callee workflow, got: " + + async.getLinks(0).getUrl(), + async.getLinks(0).getUrl().contains("callee-wf")); + } + + /** + * Handler that simulates what a real Nexus operation would do after issuing a signal: stash a + * backlink on the operation context, then return an async result. Lets us exercise the + * async-response link merge in {@link NexusTaskHandlerImpl} without standing up a real signal + * RPC. + */ + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public class BacklinkStashingAsyncServiceImpl { + @OperationImpl + public OperationHandler operation() { + return new OperationHandler() { + @Override + public OperationStartResult start( + OperationContext ctx, OperationStartDetails details, @Nullable String token) { + Link backlink = + Link.newBuilder() + .setWorkflowEvent( + Link.WorkflowEvent.newBuilder() + .setNamespace(NAMESPACE) + .setWorkflowId("callee-wf") + .setRunId("callee-run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED))) + .build(); + CurrentNexusOperationContext.get().addSignalWorkflowResponseLink(backlink); + return OperationStartResult.async(token); + } + + @Override + public void cancel(OperationContext ctx, OperationCancelDetails details) {} + }; + } + } + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) public class TestNexusServiceImpl { @OperationImpl diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SignalOperationLinkingTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SignalOperationLinkingTest.java new file mode 100644 index 000000000..1e071106b --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SignalOperationLinkingTest.java @@ -0,0 +1,517 @@ +package io.temporal.workflow.nexus; + +import static org.junit.Assume.assumeTrue; + +import com.google.protobuf.util.Durations; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.nexusrpc.handler.OperationCancelDetails; +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.OperationStartDetails; +import io.nexusrpc.handler.OperationStartResult; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.History; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.workflowservice.v1.RegisterNamespaceRequest; +import io.temporal.client.BatchRequest; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.nexus.Nexus; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.NexusOperationHandle; +import io.temporal.workflow.NexusOperationOptions; +import io.temporal.workflow.NexusServiceOptions; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import io.temporal.workflow.shared.TestNexusServices; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import javax.annotation.Nullable; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** + * Verifies link propagation in both directions when a Nexus operation handler interacts with a + * workflow via signal. Covers four scenarios: + * + *
    + *
  • {@link #testSignalOperationLinks()} — same-namespace, sync handler, two signals + * (signalWithStart + plain signal). + *
  • {@link #testCrossNamespaceSignalOperationLinks()} — caller and callee in different + * namespaces; otherwise identical to the same-namespace case. + *
  • {@link #testMultiSignalOperationLinks()} — one Nexus operation signals three different + * callees; verifies all three backlinks land on the caller's single {@code + * NexusOperationCompleted} event. + *
  • {@link #testAsyncSignalOperationLinks()} — handler returns an async result after signaling; + * verifies the backlink lands on {@code NexusOperationStarted} (the async response path in + * {@link io.temporal.internal.nexus.NexusTaskHandlerImpl}). + *
+ * + *

All four tests require Temporal server ≥ 1.31 with {@code EnableCHASMSignalBacklinks=true}; + * the in-memory test server does not implement this path so the class is skipped unless a real + * server is in use. + */ +public class SignalOperationLinkingTest extends BaseNexusTest { + + private static final String MODE_SIGNAL_WITH_START = "signalWithStart"; + private static final String MODE_SIGNAL = "signal"; + private static final String MODE_MULTI_SIGNAL_WITH_START = "multi"; + private static final String MODE_ASYNC_SIGNAL_WITH_START = "asyncSignalWithStart"; + private static final String CALLEE_NAMESPACE = "UnitTest2"; + + // Caller workflow + Nexus handler register here (namespace UnitTest). + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(SignalCallerWorkflow.class, SignalCalleeWorkflowImpl.class) + .setNexusServiceImplementation(new SignalingNexusServiceImpl()) + .build(); + + // Separate worker/client on the callee namespace, used by the cross-namespace test. No + // precedent in the repo for multi-@Rule SDKTestWorkflowRule patterns; every test method pays + // the cost of starting this second worker even if it doesn't use it. Acceptable for the + // current test count; revisit if more cross-namespace tests get added. + @Rule + public SDKTestWorkflowRule calleeNamespaceRule = + SDKTestWorkflowRule.newBuilder() + .setNamespace(CALLEE_NAMESPACE) + .setWorkflowTypes(SignalCalleeWorkflowImpl.class) + .build(); + + @BeforeClass + public static void requireExternalServiceAndSetupCalleeNamespace() { + // The server-side backlink implementation (temporalio/temporal#9897) is gated by + // EnableCHASMSignalBacklinks and is only present in real servers. + assumeTrue( + "signal backlinks require a real server with EnableCHASMSignalBacklinks=true", + SDKTestWorkflowRule.useExternalService); + // The test rule does not auto-register namespaces on an external server. + ensureNamespaceExists(CALLEE_NAMESPACE); + } + + @After + public void resetNamespaceOverrides() { + SignalingNexusServiceImpl.calleeNamespaceOverride = null; + SignalingNexusServiceImpl.calleeTaskQueueOverride = null; + } + + @Override + protected SDKTestWorkflowRule getTestWorkflowRule() { + return testWorkflowRule; + } + + // ── Tests ──────────────────────────────────────────────────────────────────────────────── + + @Test + public void testSignalOperationLinks() { + runTwoSignalScenario(testWorkflowRule); + } + + @Test + public void testCrossNamespaceSignalOperationLinks() { + SignalingNexusServiceImpl.calleeNamespaceOverride = CALLEE_NAMESPACE; + SignalingNexusServiceImpl.calleeTaskQueueOverride = calleeNamespaceRule.getTaskQueue(); + runTwoSignalScenario(calleeNamespaceRule); + } + + /** + * One Nexus operation signals three different callees. The handler's three signal-class RPCs each + * contribute a backlink and all three end up on the caller's single {@code + * NexusOperationCompleted} event. + */ + @Test + public void testMultiSignalOperationLinks() { + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + List calleeIds = + Arrays.asList( + "multi-callee-a-" + UUID.randomUUID(), + "multi-callee-b-" + UUID.randomUUID(), + "multi-callee-c-" + UUID.randomUUID()); + + TestWorkflows.TestWorkflow1 callerStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class, "caller"); + String result = + callerStub.execute(MODE_MULTI_SIGNAL_WITH_START + ":" + String.join(",", calleeIds)); + Assert.assertEquals("ok:multi:" + String.join(",", calleeIds), result); + + // Each callee gets one signal and completes. + for (String calleeId : calleeIds) { + String calleeResult = client.newUntypedWorkflowStub(calleeId).getResult(String.class); + Assert.assertEquals("multi-signal", calleeResult); + } + + String callerWorkflowId = WorkflowStub.fromTyped(callerStub).getExecution().getWorkflowId(); + History callerHistory = client.fetchHistory(callerWorkflowId).getHistory(); + + // Caller → each callee: forward links on every callee's WorkflowExecutionSignaled event. + for (String calleeId : calleeIds) { + History calleeHistory = client.fetchHistory(calleeId).getHistory(); + assertForwardLinks(calleeHistory, callerWorkflowId, /* expectedCount= */ 1); + } + + // Callee → caller: the single NexusOperationCompleted carries one backlink per callee. + List completedEvents = + getAllEventsOfType(callerHistory, EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED); + Assert.assertEquals( + "expected exactly one NexusOperationCompleted event", 1, completedEvents.size()); + HistoryEvent completed = completedEvents.get(0); + Assert.assertEquals( + "expected one backlink per signaled callee", calleeIds.size(), completed.getLinksCount()); + List backlinkWorkflowIds = new ArrayList<>(); + for (int i = 0; i < completed.getLinksCount(); i++) { + io.temporal.api.common.v1.Link.WorkflowEvent backlink = + completed.getLinks(i).getWorkflowEvent(); + backlinkWorkflowIds.add(backlink.getWorkflowId()); + EventType backlinkEventType = + backlink.hasRequestIdRef() + ? backlink.getRequestIdRef().getEventType() + : backlink.getEventRef().getEventType(); + Assert.assertEquals(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, backlinkEventType); + } + Assert.assertTrue( + "expected backlinks to reference all three callees: " + backlinkWorkflowIds, + backlinkWorkflowIds.containsAll(calleeIds)); + } + + /** + * Async response path: handler signals the callee then returns an async result. Verifies that the + * backlink lands on {@code NexusOperationStarted} (the async branch in NexusTaskHandlerImpl) + * rather than on {@code NexusOperationCompleted}. + */ + @Test + public void testAsyncSignalOperationLinks() { + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + String calleeWorkflowId = "async-callee-" + UUID.randomUUID(); + + TestWorkflows.TestWorkflow1 callerStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class, "caller"); + String result = callerStub.execute(MODE_ASYNC_SIGNAL_WITH_START + ":" + calleeWorkflowId); + Assert.assertEquals("async-started", result); + + String calleeResult = client.newUntypedWorkflowStub(calleeWorkflowId).getResult(String.class); + Assert.assertEquals("async-signal", calleeResult); + + String callerWorkflowId = WorkflowStub.fromTyped(callerStub).getExecution().getWorkflowId(); + History callerHistory = client.fetchHistory(callerWorkflowId).getHistory(); + History calleeHistory = client.fetchHistory(calleeWorkflowId).getHistory(); + + assertForwardLinks(calleeHistory, callerWorkflowId, /* expectedCount= */ 1); + + // Backward direction lands on NexusOperationStarted for the async response path. + List startedEvents = + getAllEventsOfType(callerHistory, EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED); + Assert.assertEquals( + "expected exactly one NexusOperationStarted event for the async op", + 1, + startedEvents.size()); + assertBacklink(startedEvents.get(0), calleeWorkflowId); + } + + // ── Shared scenario + assertion helpers ────────────────────────────────────────────────── + + /** + * Drive the two-signal flow (signalWithStart + plain signal) and assert link propagation. Used by + * same-namespace and cross-namespace tests; the only thing that varies is which rule's client + * fetches the callee history. + */ + private void runTwoSignalScenario(SDKTestWorkflowRule calleeRule) { + WorkflowClient callerClient = testWorkflowRule.getWorkflowClient(); + WorkflowClient calleeClient = calleeRule.getWorkflowClient(); + String calleeWorkflowId = "signal-callee-" + UUID.randomUUID(); + + TestWorkflows.TestWorkflow1 callerStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class, "caller"); + String result = callerStub.execute("twoSync:" + calleeWorkflowId); + Assert.assertEquals("ok:signalWithStart|ok:signal", result); + + String calleeResult = + calleeClient.newUntypedWorkflowStub(calleeWorkflowId).getResult(String.class); + Assert.assertEquals("first,second", calleeResult); + + String callerWorkflowId = WorkflowStub.fromTyped(callerStub).getExecution().getWorkflowId(); + History callerHistory = callerClient.fetchHistory(callerWorkflowId).getHistory(); + History calleeHistory = calleeClient.fetchHistory(calleeWorkflowId).getHistory(); + + assertForwardLinks(calleeHistory, callerWorkflowId, /* expectedCount= */ 2); + + List completedEvents = + getAllEventsOfType(callerHistory, EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED); + Assert.assertEquals( + "expected two NexusOperationCompleted events on the caller", 2, completedEvents.size()); + for (HistoryEvent completed : completedEvents) { + assertBacklink(completed, calleeWorkflowId); + } + } + + /** + * Assert that the callee history has {@code expectedCount} {@code WorkflowExecutionSignaled} + * events, each linked back to the caller's {@code NexusOperationScheduled} event. + */ + private static void assertForwardLinks( + History calleeHistory, String callerWorkflowId, int expectedCount) { + List signaledEvents = + getAllEventsOfType(calleeHistory, EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED); + Assert.assertEquals( + "expected " + expectedCount + " WorkflowExecutionSignaled events on the callee", + expectedCount, + signaledEvents.size()); + for (HistoryEvent signaled : signaledEvents) { + Assert.assertTrue( + "expected at least one link on each WorkflowExecutionSignaled event", + signaled.getLinksCount() >= 1); + Assert.assertEquals( + "signaled-event link should reference the caller workflow", + callerWorkflowId, + signaled.getLinks(0).getWorkflowEvent().getWorkflowId()); + Assert.assertEquals( + EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + signaled.getLinks(0).getWorkflowEvent().getEventRef().getEventType()); + } + } + + /** + * Assert that a single caller-side event ({@code NexusOperationCompleted} or {@code + * NexusOperationStarted}) carries a backlink to the callee's {@code WorkflowExecutionSignaled} + * event. Server PR #9897 keys these via {@code RequestIdReference} rather than {@code + * EventReference}, so we accept either oneof variant. + */ + private static void assertBacklink(HistoryEvent event, String calleeWorkflowId) { + Assert.assertTrue( + "expected a signal-event backlink on " + event.getEventType().name(), + event.getLinksCount() >= 1); + io.temporal.api.common.v1.Link.WorkflowEvent backlink = event.getLinks(0).getWorkflowEvent(); + Assert.assertEquals(calleeWorkflowId, backlink.getWorkflowId()); + EventType backlinkEventType = + backlink.hasRequestIdRef() + ? backlink.getRequestIdRef().getEventType() + : backlink.getEventRef().getEventType(); + Assert.assertEquals(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED, backlinkEventType); + } + + /** Find all history events of a given type, in order. */ + private static List getAllEventsOfType(History history, EventType type) { + List out = new ArrayList<>(); + for (HistoryEvent e : history.getEventsList()) { + if (e.getEventType() == type) { + out.add(e); + } + } + return out; + } + + /** + * Register {@code namespace} on the external server if it doesn't already exist. Honours the + * {@code TEMPORAL_SERVICE_ADDRESS} env var the same way {@code + * io.temporal.testing.internal.ExternalServiceTestConfigurator} does, so the test works against + * whichever server the test rule itself connects to. + */ + private static void ensureNamespaceExists(String namespace) { + String target = System.getenv("TEMPORAL_SERVICE_ADDRESS"); + WorkflowServiceStubsOptions.Builder optionsBuilder = WorkflowServiceStubsOptions.newBuilder(); + if (target != null && !target.isEmpty()) { + optionsBuilder.setTarget(target); + } + WorkflowServiceStubs stubs = WorkflowServiceStubs.newServiceStubs(optionsBuilder.build()); + try { + stubs + .blockingStub() + .registerNamespace( + RegisterNamespaceRequest.newBuilder() + .setNamespace(namespace) + .setWorkflowExecutionRetentionPeriod(Durations.fromHours(24)) + .build()); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() != Status.Code.ALREADY_EXISTS) { + throw e; + } + } finally { + stubs.shutdownNow(); + } + } + + // ── Workflows ──────────────────────────────────────────────────────────────────────────── + + /** + * Caller workflow. Branches on a mode prefix in the input: + * + *

    + *
  • {@code twoSync:} — invoke the nexus op twice synchronously (signalWithStart, + * then signal). + *
  • {@code multi:,,} — invoke the nexus op once synchronously; handler + * signalWithStart's each id. + *
  • {@code asyncSignalWithStart:} — invoke the nexus op asynchronously via {@code + * Workflow.startNexusOperation}; wait for execution start and return without waiting for + * the operation result. + *
+ */ + public static class SignalCallerWorkflow implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + String[] parts = input.split(":", 2); + String mode = parts[0]; + String rest = parts[1]; + + TestNexusServices.TestNexusService1 stub = + Workflow.newNexusServiceStub( + TestNexusServices.TestNexusService1.class, + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(30)) + .build()) + .build()); + + switch (mode) { + case "twoSync": + { + String r1 = stub.operation(MODE_SIGNAL_WITH_START + ":" + rest); + String r2 = stub.operation(MODE_SIGNAL + ":" + rest); + return r1 + "|" + r2; + } + case MODE_MULTI_SIGNAL_WITH_START: + return stub.operation(MODE_MULTI_SIGNAL_WITH_START + ":" + rest); + case MODE_ASYNC_SIGNAL_WITH_START: + { + NexusOperationHandle h = + Workflow.startNexusOperation( + stub::operation, MODE_ASYNC_SIGNAL_WITH_START + ":" + rest); + // Wait for the async op to be Started (the event that carries the backlink) but + // not for its eventual result — the async op completes outside this workflow. + h.getExecution().get(); + return "async-started"; + } + default: + throw new IllegalArgumentException("unknown mode: " + mode); + } + } + } + + /** Callee workflow. Awaits {@code expectedSignals} signals then returns their joined payloads. */ + @WorkflowInterface + public interface SignalCalleeWorkflow { + @WorkflowMethod + String execute(int expectedSignals); + + @SignalMethod + void ping(String msg); + } + + public static class SignalCalleeWorkflowImpl implements SignalCalleeWorkflow { + private final List received = new ArrayList<>(); + + @Override + public String execute(int expectedSignals) { + Workflow.await(() -> received.size() >= expectedSignals); + return String.join(",", received); + } + + @Override + public void ping(String msg) { + received.add(msg); + } + } + + // ── Nexus service ──────────────────────────────────────────────────────────────────────── + + /** + * Single Nexus operation that dispatches based on a mode prefix in its input. Supports sync and + * async return shapes and an optional namespace override for cross-namespace tests. + */ + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public static class SignalingNexusServiceImpl { + static volatile String calleeNamespaceOverride; + static volatile String calleeTaskQueueOverride; + + @OperationImpl + public OperationHandler operation() { + return new OperationHandler() { + @Override + public OperationStartResult start( + OperationContext ctx, OperationStartDetails details, @Nullable String input) { + String[] parts = input.split(":", 2); + String mode = parts[0]; + String rest = parts[1]; + + io.temporal.nexus.NexusOperationContext opCtx = Nexus.getOperationContext(); + WorkflowClient ambient = opCtx.getWorkflowClient(); + WorkflowClient calleeClient = + calleeNamespaceOverride == null + ? ambient + : WorkflowClient.newInstance( + ambient.getWorkflowServiceStubs(), + WorkflowClientOptions.newBuilder() + .setNamespace(calleeNamespaceOverride) + .build()); + String taskQueue = + calleeTaskQueueOverride != null + ? calleeTaskQueueOverride + : opCtx.getInfo().getTaskQueue(); + + switch (mode) { + case MODE_SIGNAL_WITH_START: + signalWithStart(calleeClient, rest, taskQueue, /* expectedSignals= */ 2, "first"); + return OperationStartResult.sync("ok:" + MODE_SIGNAL_WITH_START); + case MODE_SIGNAL: + calleeClient.newWorkflowStub(SignalCalleeWorkflow.class, rest).ping("second"); + return OperationStartResult.sync("ok:" + MODE_SIGNAL); + case MODE_MULTI_SIGNAL_WITH_START: + for (String id : rest.split(",")) { + signalWithStart( + calleeClient, id, taskQueue, /* expectedSignals= */ 1, "multi-signal"); + } + return OperationStartResult.sync("ok:multi:" + rest); + case MODE_ASYNC_SIGNAL_WITH_START: + signalWithStart( + calleeClient, rest, taskQueue, /* expectedSignals= */ 1, "async-signal"); + // Async branch in NexusTaskHandlerImpl. The caller never waits for completion, so + // the token is opaque. + return OperationStartResult.async("async-op-" + UUID.randomUUID()); + default: + throw new IllegalArgumentException("unknown mode: " + mode); + } + } + + @Override + public void cancel(OperationContext ctx, OperationCancelDetails details) { + // Not exercised in these tests. + } + }; + } + + private static void signalWithStart( + WorkflowClient client, + String calleeWorkflowId, + String taskQueue, + int expectedSignals, + String signalPayload) { + SignalCalleeWorkflow startStub = + client.newWorkflowStub( + SignalCalleeWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(calleeWorkflowId) + .setTaskQueue(taskQueue) + .build()); + BatchRequest batch = client.newSignalWithStartRequest(); + batch.add(startStub::execute, expectedSignals); + batch.add(startStub::ping, signalPayload); + client.signalWithStart(batch); + } + } +} diff --git a/temporal-serviceclient/src/main/proto b/temporal-serviceclient/src/main/proto index 67150b14e..803029f9c 160000 --- a/temporal-serviceclient/src/main/proto +++ b/temporal-serviceclient/src/main/proto @@ -1 +1 @@ -Subproject commit 67150b14e0509210bf250960bd3278a4509e091c +Subproject commit 803029f9cfb905e23341d470f02aec8e7ef373d0 diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index a1cf4e111..f28075db6 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -37,6 +37,7 @@ import io.temporal.api.taskqueue.v1.StickyExecutionAttributes; import io.temporal.api.update.v1.*; import io.temporal.api.workflow.v1.*; +import io.temporal.api.workflow.v1.OnConflictOptions; import io.temporal.api.workflowservice.v1.*; import io.temporal.common.converter.DefaultDataConverter; import io.temporal.failure.ServerFailure;