diff --git a/nexus_messaging/README.md b/nexus_messaging/README.md new file mode 100644 index 00000000..22670572 --- /dev/null +++ b/nexus_messaging/README.md @@ -0,0 +1,16 @@ +This sample shows how to expose a long-running workflow's queries, updates, and signals as Nexus +operations. There are two self-contained examples, each in its own directory: + +| | `callerpattern/` | `ondemandpattern/` | +|---|---|---| +| **Pattern** | Signal an existing workflow | Create and run workflows on demand, and send signals to them | +| **Who creates the workflow?** | The handler worker starts it on boot | The caller starts it via a Nexus operation | +| **Who knows the workflow ID?** | Only the handler | The caller chooses and passes it in every operation | +| **Nexus service** | `NexusGreetingService` | `NexusRemoteGreetingService` | + +Each directory is fully self-contained for clarity. The `GreetingWorkflow`, activity, and +`Language` enum are **identical** between the two -- only the Nexus service definition and its +handler implementation differ. This highlights that the same workflow can be exposed through +Nexus in different ways depending on whether the caller needs lifecycle control. + +See each directory's README for running instructions. diff --git a/nexus_sync_operations/__init__.py b/nexus_messaging/__init__.py similarity index 100% rename from nexus_sync_operations/__init__.py rename to nexus_messaging/__init__.py diff --git a/nexus_messaging/callerpattern/README.md b/nexus_messaging/callerpattern/README.md new file mode 100644 index 00000000..d4e73e04 --- /dev/null +++ b/nexus_messaging/callerpattern/README.md @@ -0,0 +1,56 @@ +## Caller pattern + +The handler worker starts a `GreetingWorkflow` for a user ID. +`NexusGreetingServiceHandler` holds that ID and routes every Nexus operation to it. +The caller's input does not have that workflow ID as the caller doesn't know it -- but the caller +sends in the User ID, and `NexusGreetingServiceHandler` knows how to get the desired workflow ID +from that User ID (see the `get_workflow_id` call). + +The handler worker uses the same `get_workflow_id` call to generate a workflow ID from a user ID +when it launches the workflow. + +The caller workflow: +1. Queries for supported languages (`get_languages` -- backed by a `@workflow.query`) +2. Changes the language to Arabic (`set_language` -- backed by a `@workflow.update` that calls an activity) +3. Confirms the change via a second query (`get_language`) +4. Approves the workflow (`approve` -- backed by a `@workflow.signal`) + +### Running + +Start a Temporal server: + +```bash +temporal server start-dev +``` + +Create the namespaces and Nexus endpoint: + +```bash +temporal operator namespace create --namespace nexus-messaging-handler-namespace +temporal operator namespace create --namespace nexus-messaging-caller-namespace + +temporal operator nexus endpoint create \ + --name nexus-messaging-nexus-endpoint \ + --target-namespace nexus-messaging-handler-namespace \ + --target-task-queue nexus-messaging-handler-task-queue +``` + +In one terminal, start the handler worker: + +```bash +uv run python -m nexus_messaging.callerpattern.handler.worker +``` + +In another terminal, run the caller workflow: + +```bash +uv run python -m nexus_messaging.callerpattern.caller.app +``` + +Expected output: + +``` +Supported languages: [, ] +Language changed: ENGLISH -> ARABIC +Workflow approved +``` diff --git a/nexus_sync_operations/caller/__init__.py b/nexus_messaging/callerpattern/__init__.py similarity index 100% rename from nexus_sync_operations/caller/__init__.py rename to nexus_messaging/callerpattern/__init__.py diff --git a/nexus_sync_operations/handler/__init__.py b/nexus_messaging/callerpattern/caller/__init__.py similarity index 100% rename from nexus_sync_operations/handler/__init__.py rename to nexus_messaging/callerpattern/caller/__init__.py diff --git a/nexus_sync_operations/caller/app.py b/nexus_messaging/callerpattern/caller/app.py similarity index 73% rename from nexus_sync_operations/caller/app.py rename to nexus_messaging/callerpattern/caller/app.py index 375628d2..933dcd5d 100644 --- a/nexus_sync_operations/caller/app.py +++ b/nexus_messaging/callerpattern/caller/app.py @@ -6,15 +6,13 @@ from temporalio.envconfig import ClientConfig from temporalio.worker import Worker -from nexus_sync_operations.caller.workflows import CallerWorkflow +from nexus_messaging.callerpattern.caller.workflows import CallerWorkflow -NAMESPACE = "nexus-sync-operations-caller-namespace" -TASK_QUEUE = "nexus-sync-operations-caller-task-queue" +NAMESPACE = "nexus-messaging-caller-namespace" +TASK_QUEUE = "nexus-messaging-caller-task-queue" -async def execute_caller_workflow( - client: Optional[Client] = None, -) -> None: +async def execute_caller_workflow(client: Optional[Client] = None) -> None: if client is None: config = ClientConfig.load_client_connect_config() config.setdefault("target_host", "localhost:7233") @@ -28,7 +26,8 @@ async def execute_caller_workflow( ): log = await client.execute_workflow( CallerWorkflow.run, - id=str(uuid.uuid4()), + arg="user-1", + id=f"nexus-messaging-caller-{uuid.uuid4()}", task_queue=TASK_QUEUE, ) for line in log: diff --git a/nexus_messaging/callerpattern/caller/workflows.py b/nexus_messaging/callerpattern/caller/workflows.py new file mode 100644 index 00000000..04f79dad --- /dev/null +++ b/nexus_messaging/callerpattern/caller/workflows.py @@ -0,0 +1,73 @@ +""" +A caller workflow that executes Nexus operations. The caller does not have information +about how these operations are implemented by the Nexus service. +""" + +from temporalio import workflow +from temporalio.exceptions import ApplicationError + +from nexus_messaging.callerpattern.service import ( + ApproveInput, + GetLanguageInput, + GetLanguagesInput, + Language, + NexusGreetingService, + SetLanguageInput, +) + +NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint" + + +@workflow.defn +class CallerWorkflow: + @workflow.run + async def run(self, user_id: str) -> list[str]: + log: list[str] = [] + nexus_client = workflow.create_nexus_client( + service=NexusGreetingService, + endpoint=NEXUS_ENDPOINT, + ) + + # Call a Nexus operation backed by a query against the entity workflow. + # The workflow must already be running on the handler, otherwise you will + # get an error saying the workflow has already terminated. + languages_output = await nexus_client.execute_operation( + NexusGreetingService.get_languages, + GetLanguagesInput(include_unsupported=False, user_id=user_id), + ) + log.append(f"Supported languages: {languages_output.languages}") + workflow.logger.info("Supported languages: %s", languages_output.languages) + + # Following are examples for each of the three messaging types - + # update, query, then signal. + + # Call a Nexus operation backed by an update against the entity workflow. + previous_language = await nexus_client.execute_operation( + NexusGreetingService.set_language, + SetLanguageInput(language=Language.ARABIC, user_id=user_id), + ) + + # Call a Nexus operation backed by a query to confirm the language change. + current_language = await nexus_client.execute_operation( + NexusGreetingService.get_language, + GetLanguageInput(user_id=user_id), + ) + if current_language != Language.ARABIC: + raise ApplicationError(f"Expected language ARABIC, got {current_language}") + + log.append( + f"Language changed: {previous_language.name} -> {Language.ARABIC.name}" + ) + workflow.logger.info( + "Language changed from %s to %s", previous_language, Language.ARABIC + ) + + # Call a Nexus operation backed by a signal against the entity workflow. + await nexus_client.execute_operation( + NexusGreetingService.approve, + ApproveInput(name="caller", user_id=user_id), + ) + log.append("Workflow approved") + workflow.logger.info("Workflow approved") + + return log diff --git a/nexus_messaging/callerpattern/handler/__init__.py b/nexus_messaging/callerpattern/handler/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_messaging/callerpattern/handler/activities.py b/nexus_messaging/callerpattern/handler/activities.py new file mode 100644 index 00000000..4031b34f --- /dev/null +++ b/nexus_messaging/callerpattern/handler/activities.py @@ -0,0 +1,22 @@ +import asyncio +from typing import Optional + +from temporalio import activity + +from nexus_messaging.callerpattern.service import Language + + +@activity.defn +async def call_greeting_service(language: Language) -> Optional[str]: + """Simulates a call to a remote greeting service. Returns None if unsupported.""" + greetings = { + Language.ARABIC: "\u0645\u0631\u062d\u0628\u0627 \u0628\u0627\u0644\u0639\u0627\u0644\u0645", + Language.CHINESE: "\u4f60\u597d\uff0c\u4e16\u754c", + Language.ENGLISH: "Hello, world", + Language.FRENCH: "Bonjour, monde", + Language.HINDI: "\u0928\u092e\u0938\u094d\u0924\u0947 \u0926\u0941\u0928\u093f\u092f\u093e", + Language.PORTUGUESE: "Ol\u00e1 mundo", + Language.SPANISH: "Hola mundo", + } + await asyncio.sleep(0.2) + return greetings.get(language) diff --git a/nexus_messaging/callerpattern/handler/service_handler.py b/nexus_messaging/callerpattern/handler/service_handler.py new file mode 100644 index 00000000..cbc57ead --- /dev/null +++ b/nexus_messaging/callerpattern/handler/service_handler.py @@ -0,0 +1,80 @@ +""" +Nexus operation handler implementation for the entity pattern. Each operation receives a +user_id, which is mapped to a workflow ID. The operations are synchronous because queries +and updates against a running workflow complete quickly. +""" + +from __future__ import annotations + +import nexusrpc +from temporalio import nexus +from temporalio.client import WorkflowHandle + +from nexus_messaging.callerpattern.handler.workflows import GreetingWorkflow +from nexus_messaging.callerpattern.service import ( + ApproveInput, + ApproveOutput, + GetLanguageInput, + GetLanguagesInput, + GetLanguagesOutput, + Language, + NexusGreetingService, + SetLanguageInput, +) + +WORKFLOW_ID_PREFIX = "GreetingWorkflow_for_" + + +def get_workflow_id(user_id: str) -> str: + """Map a user ID to a workflow ID. + + This example assumes you might have multiple workflows, one for each user. + If you had a single workflow for all users, you could remove this function, + remove the user_id from each input, and just use a single workflow ID. + """ + return f"{WORKFLOW_ID_PREFIX}{user_id}" + + +@nexusrpc.handler.service_handler(service=NexusGreetingService) +class NexusGreetingServiceHandler: + def _get_workflow_handle( + self, user_id: str + ) -> WorkflowHandle[GreetingWorkflow, str]: + return nexus.client().get_workflow_handle_for( + GreetingWorkflow.run, get_workflow_id(user_id) + ) + + @nexusrpc.handler.sync_operation + async def get_languages( + self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput + ) -> GetLanguagesOutput: + return await self._get_workflow_handle(input.user_id).query( + GreetingWorkflow.get_languages, input + ) + + @nexusrpc.handler.sync_operation + async def get_language( + self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguageInput + ) -> Language: + return await self._get_workflow_handle(input.user_id).query( + GreetingWorkflow.get_language + ) + + # Routes to set_language_using_activity (not set_language) so that new languages not + # already in the greetings map can be fetched via an activity. + @nexusrpc.handler.sync_operation + async def set_language( + self, ctx: nexusrpc.handler.StartOperationContext, input: SetLanguageInput + ) -> Language: + return await self._get_workflow_handle(input.user_id).execute_update( + GreetingWorkflow.set_language_using_activity, input + ) + + @nexusrpc.handler.sync_operation + async def approve( + self, ctx: nexusrpc.handler.StartOperationContext, input: ApproveInput + ) -> ApproveOutput: + await self._get_workflow_handle(input.user_id).signal( + GreetingWorkflow.approve, input + ) + return ApproveOutput() diff --git a/nexus_messaging/callerpattern/handler/worker.py b/nexus_messaging/callerpattern/handler/worker.py new file mode 100644 index 00000000..fa8e2c0f --- /dev/null +++ b/nexus_messaging/callerpattern/handler/worker.py @@ -0,0 +1,62 @@ +import asyncio +import logging +from typing import Optional + +from temporalio.client import Client +from temporalio.common import WorkflowIDConflictPolicy +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from nexus_messaging.callerpattern.handler.activities import call_greeting_service +from nexus_messaging.callerpattern.handler.service_handler import ( + NexusGreetingServiceHandler, + get_workflow_id, +) +from nexus_messaging.callerpattern.handler.workflows import GreetingWorkflow + +interrupt_event = asyncio.Event() + +NAMESPACE = "nexus-messaging-handler-namespace" +TASK_QUEUE = "nexus-messaging-handler-task-queue" +USER_ID = "user-1" + + +async def main(client: Optional[Client] = None): + logging.basicConfig(level=logging.INFO) + + if client is None: + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + config.setdefault("namespace", NAMESPACE) + client = await Client.connect(**config) + + # Start the long-running entity workflow that backs the Nexus service, + # if not already running. + workflow_id = get_workflow_id(USER_ID) + await client.start_workflow( + GreetingWorkflow.run, + id=workflow_id, + task_queue=TASK_QUEUE, + id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, + ) + logging.info("Started greeting workflow: %s", workflow_id) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[GreetingWorkflow], + activities=[call_greeting_service], + nexus_service_handlers=[NexusGreetingServiceHandler()], + ): + logging.info("Handler worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/nexus_messaging/callerpattern/handler/workflows.py b/nexus_messaging/callerpattern/handler/workflows.py new file mode 100644 index 00000000..fb10d8c5 --- /dev/null +++ b/nexus_messaging/callerpattern/handler/workflows.py @@ -0,0 +1,87 @@ +""" +A long-running "entity" workflow that backs the NexusGreetingService Nexus operations. +The workflow exposes queries, an update, and a signal. These are private implementation +details of the Nexus service: the caller only interacts via Nexus operations. +""" + +import asyncio +from datetime import timedelta + +from temporalio import workflow +from temporalio.exceptions import ApplicationError + +from nexus_messaging.callerpattern.handler.activities import call_greeting_service +from nexus_messaging.callerpattern.service import ( + ApproveInput, + GetLanguagesInput, + GetLanguagesOutput, + Language, + SetLanguageInput, +) + + +@workflow.defn +class GreetingWorkflow: + def __init__(self) -> None: + self.approved_for_release = False + self.greetings: dict[Language, str] = { + Language.CHINESE: "\u4f60\u597d\uff0c\u4e16\u754c", + Language.ENGLISH: "Hello, world", + } + self.language = Language.ENGLISH + self.lock = asyncio.Lock() + + @workflow.run + async def run(self) -> str: + # Wait until approved and all in-flight update handlers have finished. + await workflow.wait_condition( + lambda: self.approved_for_release and workflow.all_handlers_finished() + ) + return self.greetings[self.language] + + @workflow.query + def get_languages(self, input: GetLanguagesInput) -> GetLanguagesOutput: + if input.include_unsupported: + languages = sorted(Language) + else: + languages = sorted(self.greetings) + return GetLanguagesOutput(languages=languages) + + @workflow.query + def get_language(self) -> Language: + return self.language + + @workflow.signal + def approve(self, input: ApproveInput) -> None: + workflow.logger.info("Approval signal received for user %s", input.user_id) + self.approved_for_release = True + + @workflow.update + def set_language(self, input: SetLanguageInput) -> Language: + workflow.logger.info("setLanguage update received for user %s", input.user_id) + previous_language, self.language = self.language, input.language + return previous_language + + @set_language.validator + def validate_set_language(self, input: SetLanguageInput) -> None: + if input.language not in self.greetings: + raise ValueError(f"{input.language.name} is not supported") + + # Changes the active language, calling an activity to fetch a greeting for new + # languages not already in the greetings map. + @workflow.update + async def set_language_using_activity(self, input: SetLanguageInput) -> Language: + if input.language not in self.greetings: + async with self.lock: + greeting = await workflow.execute_activity( + call_greeting_service, + input.language, + start_to_close_timeout=timedelta(seconds=10), + ) + if greeting is None: + raise ApplicationError( + f"Greeting service does not support {input.language.name}" + ) + self.greetings[input.language] = greeting + previous_language, self.language = self.language, input.language + return previous_language diff --git a/nexus_messaging/callerpattern/service.py b/nexus_messaging/callerpattern/service.py new file mode 100644 index 00000000..23a550fb --- /dev/null +++ b/nexus_messaging/callerpattern/service.py @@ -0,0 +1,67 @@ +""" +Nexus service definition for the caller (entity) pattern. Shared between the handler and +caller. The caller uses this to create a type-safe Nexus client; the handler implements +the operations. + +Every operation includes a user_id so the handler knows which entity workflow to target. +""" + +from dataclasses import dataclass +from enum import IntEnum + +import nexusrpc + + +class Language(IntEnum): + ARABIC = 1 + CHINESE = 2 + ENGLISH = 3 + FRENCH = 4 + HINDI = 5 + PORTUGUESE = 6 + SPANISH = 7 + + +@dataclass +class GetLanguagesInput: + include_unsupported: bool + user_id: str + + +@dataclass +class GetLanguagesOutput: + languages: list[Language] + + +@dataclass +class GetLanguageInput: + user_id: str + + +@dataclass +class SetLanguageInput: + language: Language + user_id: str + + +@dataclass +class ApproveInput: + name: str + user_id: str + + +@dataclass +class ApproveOutput: + pass + + +@nexusrpc.service +class NexusGreetingService: + # Returns the languages supported by the greeting workflow. + get_languages: nexusrpc.Operation[GetLanguagesInput, GetLanguagesOutput] + # Returns the currently active language. + get_language: nexusrpc.Operation[GetLanguageInput, Language] + # Changes the active language, returning the previous one. + set_language: nexusrpc.Operation[SetLanguageInput, Language] + # Approves the workflow, allowing it to complete. + approve: nexusrpc.Operation[ApproveInput, ApproveOutput] diff --git a/nexus_messaging/endpoint_description.md b/nexus_messaging/endpoint_description.md new file mode 100644 index 00000000..4184134b --- /dev/null +++ b/nexus_messaging/endpoint_description.md @@ -0,0 +1,14 @@ +## Services + +### [NexusGreetingService](https://github.com/temporalio/samples-python/blob/main/nexus_messaging/callerpattern/service.py) (callerpattern) +- operation: `get_languages` +- operation: `get_language` +- operation: `set_language` +- operation: `approve` + +### [NexusRemoteGreetingService](https://github.com/temporalio/samples-python/blob/main/nexus_messaging/ondemandpattern/service.py) (ondemandpattern) +- operation: `run_from_remote` +- operation: `get_languages` +- operation: `get_language` +- operation: `set_language` +- operation: `approve` diff --git a/nexus_messaging/ondemandpattern/README.md b/nexus_messaging/ondemandpattern/README.md new file mode 100644 index 00000000..53c3dae6 --- /dev/null +++ b/nexus_messaging/ondemandpattern/README.md @@ -0,0 +1,60 @@ +## On-demand pattern + +No workflow is pre-started. The caller creates and controls workflow instances through Nexus +operations. `NexusRemoteGreetingService` adds a `run_from_remote` operation that starts a new +`GreetingWorkflow`, and every other operation includes a `user_id` so the handler knows which +instance to target. + +The caller workflow: +1. Starts two remote `GreetingWorkflow` instances via `run_from_remote` (backed by `workflow_run_operation`) +2. Queries each for supported languages +3. Changes the language on each (Arabic and Hindi) +4. Confirms the changes via queries +5. Approves both workflows +6. Waits for each to complete and returns their results + +### Running + +Start a Temporal server: + +```bash +temporal server start-dev +``` + +Create the namespaces and Nexus endpoint: + +```bash +temporal operator namespace create --namespace nexus-messaging-handler-namespace +temporal operator namespace create --namespace nexus-messaging-caller-namespace + +temporal operator nexus endpoint create \ + --name nexus-messaging-nexus-endpoint \ + --target-namespace nexus-messaging-handler-namespace \ + --target-task-queue nexus-messaging-handler-task-queue +``` + +In one terminal, start the handler worker: + +```bash +uv run python -m nexus_messaging.ondemandpattern.handler.worker +``` + +In another terminal, run the caller workflow: + +```bash +uv run python -m nexus_messaging.ondemandpattern.caller.app +``` + +Expected output: + +``` +started remote greeting workflow: UserId One +started remote greeting workflow: UserId Two +Supported languages for UserId One: [, ] +Supported languages for UserId Two: [, ] +UserId One changed language: ENGLISH -> ARABIC +UserId Two changed language: ENGLISH -> HINDI +Workflows approved +Workflow one result: مرحبا بالعالم +Workflow two result: नमस्ते दुनिया +``` diff --git a/nexus_messaging/ondemandpattern/__init__.py b/nexus_messaging/ondemandpattern/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_messaging/ondemandpattern/caller/__init__.py b/nexus_messaging/ondemandpattern/caller/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_messaging/ondemandpattern/caller/app.py b/nexus_messaging/ondemandpattern/caller/app.py new file mode 100644 index 00000000..a1837bab --- /dev/null +++ b/nexus_messaging/ondemandpattern/caller/app.py @@ -0,0 +1,41 @@ +import asyncio +import uuid +from typing import Optional + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from nexus_messaging.ondemandpattern.caller.workflows import CallerRemoteWorkflow + +NAMESPACE = "nexus-messaging-caller-namespace" +TASK_QUEUE = "nexus-messaging-caller-remote-task-queue" + + +async def execute_caller_workflow(client: Optional[Client] = None) -> None: + if client is None: + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + config.setdefault("namespace", NAMESPACE) + client = await Client.connect(**config) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[CallerRemoteWorkflow], + ): + log = await client.execute_workflow( + CallerRemoteWorkflow.run, + id=f"nexus-messaging-remote-caller-{uuid.uuid4()}", + task_queue=TASK_QUEUE, + ) + for line in log: + print(line) + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(execute_caller_workflow()) + except KeyboardInterrupt: + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/nexus_messaging/ondemandpattern/caller/workflows.py b/nexus_messaging/ondemandpattern/caller/workflows.py new file mode 100644 index 00000000..8dbbd4b2 --- /dev/null +++ b/nexus_messaging/ondemandpattern/caller/workflows.py @@ -0,0 +1,148 @@ +""" +A caller workflow that creates and controls workflow instances through Nexus operations. +Unlike the entity (callerpattern), no workflow is pre-started; the caller creates them +on demand via the run_from_remote operation. +""" + +from temporalio import workflow + +from nexus_messaging.ondemandpattern.service import ( + ApproveInput, + GetLanguageInput, + GetLanguagesInput, + Language, + NexusRemoteGreetingService, + RunFromRemoteInput, + SetLanguageInput, +) + +NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint" + +REMOTE_WORKFLOW_ONE = "UserId One" +REMOTE_WORKFLOW_TWO = "UserId Two" + + +@workflow.defn +class CallerRemoteWorkflow: + def __init__(self) -> None: + self.nexus_client = workflow.create_nexus_client( + service=NexusRemoteGreetingService, + endpoint=NEXUS_ENDPOINT, + ) + + @workflow.run + async def run(self) -> list[str]: + log: list[str] = [] + + # Each call is performed twice in this example. This assumes there are two + # users we want to process. The first calls start two workflows, one for each + # user. Subsequent calls perform different actions between the two users. + + # This is an async Nexus operation -- starts a workflow on the handler and + # returns a handle. Unlike the sync operations below, this does not block + # until the workflow completes. It is backed by workflow_run_operation on the + # handler side. + handle_one = await self.nexus_client.start_operation( + NexusRemoteGreetingService.run_from_remote, + RunFromRemoteInput(user_id=REMOTE_WORKFLOW_ONE), + ) + log.append(f"started remote greeting workflow: {REMOTE_WORKFLOW_ONE}") + workflow.logger.info("started remote greeting workflow %s", REMOTE_WORKFLOW_ONE) + + handle_two = await self.nexus_client.start_operation( + NexusRemoteGreetingService.run_from_remote, + RunFromRemoteInput(user_id=REMOTE_WORKFLOW_TWO), + ) + log.append(f"started remote greeting workflow: {REMOTE_WORKFLOW_TWO}") + workflow.logger.info("started remote greeting workflow %s", REMOTE_WORKFLOW_TWO) + + # Query the remote workflows for supported languages. + languages_output = await self.nexus_client.execute_operation( + NexusRemoteGreetingService.get_languages, + GetLanguagesInput(include_unsupported=False, user_id=REMOTE_WORKFLOW_ONE), + ) + log.append( + f"Supported languages for {REMOTE_WORKFLOW_ONE}: " + f"{languages_output.languages}" + ) + workflow.logger.info( + "supported languages are %s for workflow %s", + languages_output.languages, + REMOTE_WORKFLOW_ONE, + ) + + languages_output = await self.nexus_client.execute_operation( + NexusRemoteGreetingService.get_languages, + GetLanguagesInput(include_unsupported=False, user_id=REMOTE_WORKFLOW_TWO), + ) + log.append( + f"Supported languages for {REMOTE_WORKFLOW_TWO}: " + f"{languages_output.languages}" + ) + workflow.logger.info( + "supported languages are %s for workflow %s", + languages_output.languages, + REMOTE_WORKFLOW_TWO, + ) + + # Update the language on each remote workflow. + previous_language_one = await self.nexus_client.execute_operation( + NexusRemoteGreetingService.set_language, + SetLanguageInput(language=Language.ARABIC, user_id=REMOTE_WORKFLOW_ONE), + ) + + previous_language_two = await self.nexus_client.execute_operation( + NexusRemoteGreetingService.set_language, + SetLanguageInput(language=Language.HINDI, user_id=REMOTE_WORKFLOW_TWO), + ) + + # Confirm the changes by querying. + current_language = await self.nexus_client.execute_operation( + NexusRemoteGreetingService.get_language, + GetLanguageInput(user_id=REMOTE_WORKFLOW_ONE), + ) + log.append( + f"{REMOTE_WORKFLOW_ONE} changed language: " + f"{previous_language_one.name} -> {current_language.name}" + ) + workflow.logger.info( + "Language changed from %s to %s for workflow %s", + previous_language_one, + current_language, + REMOTE_WORKFLOW_ONE, + ) + + current_language = await self.nexus_client.execute_operation( + NexusRemoteGreetingService.get_language, + GetLanguageInput(user_id=REMOTE_WORKFLOW_TWO), + ) + log.append( + f"{REMOTE_WORKFLOW_TWO} changed language: " + f"{previous_language_two.name} -> {current_language.name}" + ) + workflow.logger.info( + "Language changed from %s to %s for workflow %s", + previous_language_two, + current_language, + REMOTE_WORKFLOW_TWO, + ) + + # Approve both workflows so they can complete. + await self.nexus_client.execute_operation( + NexusRemoteGreetingService.approve, + ApproveInput(name="remote-caller", user_id=REMOTE_WORKFLOW_ONE), + ) + await self.nexus_client.execute_operation( + NexusRemoteGreetingService.approve, + ApproveInput(name="remote-caller", user_id=REMOTE_WORKFLOW_TWO), + ) + log.append("Workflows approved") + + # Wait for the remote workflows to finish and return their results. + result = await handle_one + log.append(f"Workflow one result: {result}") + + result = await handle_two + log.append(f"Workflow two result: {result}") + + return log diff --git a/nexus_messaging/ondemandpattern/handler/__init__.py b/nexus_messaging/ondemandpattern/handler/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_messaging/ondemandpattern/handler/activities.py b/nexus_messaging/ondemandpattern/handler/activities.py new file mode 100644 index 00000000..ba028489 --- /dev/null +++ b/nexus_messaging/ondemandpattern/handler/activities.py @@ -0,0 +1,22 @@ +import asyncio +from typing import Optional + +from temporalio import activity + +from nexus_messaging.ondemandpattern.service import Language + + +@activity.defn +async def call_greeting_service(language: Language) -> Optional[str]: + """Simulates a call to a remote greeting service. Returns None if unsupported.""" + greetings = { + Language.ARABIC: "\u0645\u0631\u062d\u0628\u0627 \u0628\u0627\u0644\u0639\u0627\u0644\u0645", + Language.CHINESE: "\u4f60\u597d\uff0c\u4e16\u754c", + Language.ENGLISH: "Hello, world", + Language.FRENCH: "Bonjour, monde", + Language.HINDI: "\u0928\u092e\u0938\u094d\u0924\u0947 \u0926\u0941\u0928\u093f\u092f\u093e", + Language.PORTUGUESE: "Ol\u00e1 mundo", + Language.SPANISH: "Hola mundo", + } + await asyncio.sleep(0.2) + return greetings.get(language) diff --git a/nexus_messaging/ondemandpattern/handler/service_handler.py b/nexus_messaging/ondemandpattern/handler/service_handler.py new file mode 100644 index 00000000..1351aae7 --- /dev/null +++ b/nexus_messaging/ondemandpattern/handler/service_handler.py @@ -0,0 +1,84 @@ +""" +Nexus operation handler for the on-demand pattern. Each operation receives the target +userId in its input, and run_from_remote starts a brand-new GreetingWorkflow. +""" + +from __future__ import annotations + +import nexusrpc +from temporalio import nexus +from temporalio.client import WorkflowHandle + +from nexus_messaging.ondemandpattern.handler.workflows import GreetingWorkflow +from nexus_messaging.ondemandpattern.service import ( + ApproveInput, + ApproveOutput, + GetLanguageInput, + GetLanguagesInput, + GetLanguagesOutput, + Language, + NexusRemoteGreetingService, + RunFromRemoteInput, + SetLanguageInput, +) + +WORKFLOW_ID_PREFIX = "GreetingWorkflow_for_" + + +@nexusrpc.handler.service_handler(service=NexusRemoteGreetingService) +class NexusRemoteGreetingServiceHandler: + def _get_workflow_id(self, user_id: str) -> str: + return WORKFLOW_ID_PREFIX + user_id + + def _get_workflow_handle( + self, user_id: str + ) -> WorkflowHandle[GreetingWorkflow, str]: + return nexus.client().get_workflow_handle_for( + GreetingWorkflow.run, self._get_workflow_id(user_id) + ) + + # Starts a new GreetingWorkflow with the caller-specified user ID. + # This is an async Nexus operation backed by workflow_run_operation. + @nexus.workflow_run_operation + async def run_from_remote( + self, ctx: nexus.WorkflowRunOperationContext, input: RunFromRemoteInput + ) -> nexus.WorkflowHandle[str]: + return await ctx.start_workflow( + GreetingWorkflow.run, + id=self._get_workflow_id(input.user_id), + ) + + @nexusrpc.handler.sync_operation + async def get_languages( + self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput + ) -> GetLanguagesOutput: + return await self._get_workflow_handle(input.user_id).query( + GreetingWorkflow.get_languages, input + ) + + @nexusrpc.handler.sync_operation + async def get_language( + self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguageInput + ) -> Language: + return await self._get_workflow_handle(input.user_id).query( + GreetingWorkflow.get_language, + ) + + # Routes to set_language_using_activity so that new languages not already in the + # greetings map can be fetched via an activity. + @nexusrpc.handler.sync_operation + async def set_language( + self, ctx: nexusrpc.handler.StartOperationContext, input: SetLanguageInput + ) -> Language: + return await self._get_workflow_handle(input.user_id).execute_update( + GreetingWorkflow.set_language_using_activity, input + ) + + @nexusrpc.handler.sync_operation + async def approve( + self, ctx: nexusrpc.handler.StartOperationContext, input: ApproveInput + ) -> ApproveOutput: + await self._get_workflow_handle(input.user_id).signal( + GreetingWorkflow.approve, input + ) + return ApproveOutput() diff --git a/nexus_sync_operations/handler/worker.py b/nexus_messaging/ondemandpattern/handler/worker.py similarity index 58% rename from nexus_sync_operations/handler/worker.py rename to nexus_messaging/ondemandpattern/handler/worker.py index 97c8eb04..5eec9cfc 100644 --- a/nexus_sync_operations/handler/worker.py +++ b/nexus_messaging/ondemandpattern/handler/worker.py @@ -6,14 +6,16 @@ from temporalio.envconfig import ClientConfig from temporalio.worker import Worker -from message_passing.introduction.activities import call_greeting_service -from message_passing.introduction.workflows import GreetingWorkflow -from nexus_sync_operations.handler.service_handler import GreetingServiceHandler +from nexus_messaging.ondemandpattern.handler.activities import call_greeting_service +from nexus_messaging.ondemandpattern.handler.service_handler import ( + NexusRemoteGreetingServiceHandler, +) +from nexus_messaging.ondemandpattern.handler.workflows import GreetingWorkflow interrupt_event = asyncio.Event() -NAMESPACE = "nexus-sync-operations-handler-namespace" -TASK_QUEUE = "nexus-sync-operations-handler-task-queue" +NAMESPACE = "nexus-messaging-handler-namespace" +TASK_QUEUE = "nexus-messaging-handler-task-queue" async def main(client: Optional[Client] = None): @@ -25,20 +27,14 @@ async def main(client: Optional[Client] = None): config.setdefault("namespace", NAMESPACE) client = await Client.connect(**config) - # Create the nexus service handler instance, starting the long-running entity workflow that - # backs the Nexus service - greeting_service_handler = await GreetingServiceHandler.create( - "nexus-sync-operations-greeting-workflow", client, TASK_QUEUE - ) - async with Worker( client, task_queue=TASK_QUEUE, workflows=[GreetingWorkflow], activities=[call_greeting_service], - nexus_service_handlers=[greeting_service_handler], + nexus_service_handlers=[NexusRemoteGreetingServiceHandler()], ): - logging.info("Worker started, ctrl+c to exit") + logging.info("Handler worker started, ctrl+c to exit") await interrupt_event.wait() logging.info("Shutting down") diff --git a/nexus_messaging/ondemandpattern/handler/workflows.py b/nexus_messaging/ondemandpattern/handler/workflows.py new file mode 100644 index 00000000..99fc5728 --- /dev/null +++ b/nexus_messaging/ondemandpattern/handler/workflows.py @@ -0,0 +1,88 @@ +""" +A long-running "entity" workflow that backs the NexusRemoteGreetingService Nexus +operations. The workflow exposes queries, an update, and a signal. These are private +implementation details of the Nexus service: the caller only interacts via Nexus +operations. +""" + +import asyncio +from datetime import timedelta + +from temporalio import workflow +from temporalio.exceptions import ApplicationError + +from nexus_messaging.ondemandpattern.handler.activities import call_greeting_service +from nexus_messaging.ondemandpattern.service import ( + ApproveInput, + GetLanguagesInput, + GetLanguagesOutput, + Language, + SetLanguageInput, +) + + +@workflow.defn +class GreetingWorkflow: + def __init__(self) -> None: + self.approved_for_release = False + self.greetings: dict[Language, str] = { + Language.CHINESE: "\u4f60\u597d\uff0c\u4e16\u754c", + Language.ENGLISH: "Hello, world", + } + self.language = Language.ENGLISH + self.lock = asyncio.Lock() + + @workflow.run + async def run(self) -> str: + # Wait until approved and all in-flight update handlers have finished. + await workflow.wait_condition( + lambda: self.approved_for_release and workflow.all_handlers_finished() + ) + return self.greetings[self.language] + + @workflow.query + def get_languages(self, input: GetLanguagesInput) -> GetLanguagesOutput: + if input.include_unsupported: + languages = sorted(Language) + else: + languages = sorted(self.greetings) + return GetLanguagesOutput(languages=languages) + + @workflow.query + def get_language(self) -> Language: + return self.language + + @workflow.signal + def approve(self, input: ApproveInput) -> None: + workflow.logger.info("Approval signal received for user %s", input.user_id) + self.approved_for_release = True + + @workflow.update + def set_language(self, input: SetLanguageInput) -> Language: + workflow.logger.info("setLanguage update received for user %s", input.user_id) + previous_language, self.language = self.language, input.language + return previous_language + + @set_language.validator + def validate_set_language(self, input: SetLanguageInput) -> None: + if input.language not in self.greetings: + raise ValueError(f"{input.language.name} is not supported") + + # Changes the active language, calling an activity to fetch a greeting for new + # languages not already in the greetings map. + @workflow.update + async def set_language_using_activity(self, input: SetLanguageInput) -> Language: + if input.language not in self.greetings: + async with self.lock: + greeting = await workflow.execute_activity( + call_greeting_service, + input.language, + start_to_close_timeout=timedelta(seconds=10), + ) + if greeting is None: + raise ApplicationError( + f"Greeting service does not support {input.language.name}" + ) + self.greetings[input.language] = greeting + previous_language, self.language = self.language, input.language + return previous_language diff --git a/nexus_messaging/ondemandpattern/service.py b/nexus_messaging/ondemandpattern/service.py new file mode 100644 index 00000000..8f347d32 --- /dev/null +++ b/nexus_messaging/ondemandpattern/service.py @@ -0,0 +1,72 @@ +""" +Nexus service definition for the on-demand pattern. Every operation includes a userId +so the caller controls which workflow instance is targeted. This also exposes a +run_from_remote operation that starts a new GreetingWorkflow. +""" + +from dataclasses import dataclass +from enum import IntEnum + +import nexusrpc + + +class Language(IntEnum): + ARABIC = 1 + CHINESE = 2 + ENGLISH = 3 + FRENCH = 4 + HINDI = 5 + PORTUGUESE = 6 + SPANISH = 7 + + +@dataclass +class RunFromRemoteInput: + user_id: str + + +@dataclass +class GetLanguagesInput: + include_unsupported: bool + user_id: str + + +@dataclass +class GetLanguagesOutput: + languages: list[Language] + + +@dataclass +class GetLanguageInput: + user_id: str + + +@dataclass +class SetLanguageInput: + language: Language + user_id: str + + +@dataclass +class ApproveInput: + name: str + user_id: str + + +@dataclass +class ApproveOutput: + pass + + +@nexusrpc.service +class NexusRemoteGreetingService: + # Starts a new GreetingWorkflow with the given workflow ID (asynchronous). + run_from_remote: nexusrpc.Operation[RunFromRemoteInput, str] + # Returns the languages supported by the specified workflow. + get_languages: nexusrpc.Operation[GetLanguagesInput, GetLanguagesOutput] + # Returns the currently active language of the specified workflow. + get_language: nexusrpc.Operation[GetLanguageInput, Language] + # Changes the active language on the specified workflow, returning the previous one. + set_language: nexusrpc.Operation[SetLanguageInput, Language] + # Approves the specified workflow, allowing it to complete. + approve: nexusrpc.Operation[ApproveInput, ApproveOutput] diff --git a/nexus_sync_operations/README.md b/nexus_sync_operations/README.md deleted file mode 100644 index 10e266ec..00000000 --- a/nexus_sync_operations/README.md +++ /dev/null @@ -1,39 +0,0 @@ -This sample shows how to create a Nexus service that is backed by a long-running workflow and -exposes operations that execute updates and queries against that workflow. The long-running -workflow, and the updates/queries are private implementation detail of the nexus service: the caller -does not know how the operations are implemented. - -### Sample directory structure - -- [service.py](./service.py) - shared Nexus service definition -- [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code -- [handler](./handler) - Nexus operation handlers, together with a workflow used by one of the Nexus operations, and a worker that polls for both workflow, activity, and Nexus tasks. - - -### Instructions - -Start a Temporal server. (See the main samples repo [README](../README.md)). - -Run the following to create the caller and handler namespaces, and the Nexus endpoint: - -``` -temporal operator namespace create --namespace nexus-sync-operations-handler-namespace -temporal operator namespace create --namespace nexus-sync-operations-caller-namespace - -temporal operator nexus endpoint create \ - --name nexus-sync-operations-nexus-endpoint \ - --target-namespace nexus-sync-operations-handler-namespace \ - --target-task-queue nexus-sync-operations-handler-task-queue \ - --description-file nexus_sync_operations/endpoint_description.md -``` - -In one terminal, run the Temporal worker in the handler namespace: -``` -uv run nexus_sync_operations/handler/worker.py -``` - -In another terminal, run the Temporal worker in the caller namespace and start the caller -workflow: -``` -uv run nexus_sync_operations/caller/app.py -``` diff --git a/nexus_sync_operations/caller/workflows.py b/nexus_sync_operations/caller/workflows.py deleted file mode 100644 index a358d764..00000000 --- a/nexus_sync_operations/caller/workflows.py +++ /dev/null @@ -1,46 +0,0 @@ -""" -This is a workflow that calls nexus operations. The caller does not have information about how these -operations are implemented by the nexus service. -""" - -from temporalio import workflow - -from message_passing.introduction import Language -from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput - -with workflow.unsafe.imports_passed_through(): - from nexus_sync_operations.service import GreetingService - -NEXUS_ENDPOINT = "nexus-sync-operations-nexus-endpoint" - - -@workflow.defn -class CallerWorkflow: - @workflow.run - async def run(self) -> list[str]: - log = [] - nexus_client = workflow.create_nexus_client( - service=GreetingService, - endpoint=NEXUS_ENDPOINT, - ) - - # Get supported languages - supported_languages = await nexus_client.execute_operation( - GreetingService.get_languages, GetLanguagesInput(include_unsupported=False) - ) - log.append(f"supported languages: {supported_languages}") - - # Set language - previous_language = await nexus_client.execute_operation( - GreetingService.set_language, - SetLanguageInput(language=Language.ARABIC), - ) - assert ( - await nexus_client.execute_operation(GreetingService.get_language, None) - == Language.ARABIC - ) - log.append( - f"language changed: {previous_language.name} -> {Language.ARABIC.name}" - ) - - return log diff --git a/nexus_sync_operations/endpoint_description.md b/nexus_sync_operations/endpoint_description.md deleted file mode 100644 index a33b60cf..00000000 --- a/nexus_sync_operations/endpoint_description.md +++ /dev/null @@ -1,4 +0,0 @@ -## Service: [GreetingService](https://github.com/temporalio/samples-python/blob/main/nexus_sync_operations/service.py) -- operation: `get_languages` -- operation: `get_language` -- operation: `set_language` diff --git a/nexus_sync_operations/handler/service_handler.py b/nexus_sync_operations/handler/service_handler.py deleted file mode 100644 index 626948f0..00000000 --- a/nexus_sync_operations/handler/service_handler.py +++ /dev/null @@ -1,83 +0,0 @@ -""" -This file demonstrates how to implement a Nexus service that is backed by a long-running workflow -and exposes operations that perform updates and queries against that workflow. -""" - -from __future__ import annotations - -import nexusrpc -from temporalio import nexus -from temporalio.client import Client, WorkflowHandle -from temporalio.common import WorkflowIDConflictPolicy - -from message_passing.introduction import Language -from message_passing.introduction.workflows import ( - GetLanguagesInput, - GreetingWorkflow, - SetLanguageInput, -) -from nexus_sync_operations.service import GreetingService - - -@nexusrpc.handler.service_handler(service=GreetingService) -class GreetingServiceHandler: - def __init__(self, workflow_id: str): - self.workflow_id = workflow_id - - @classmethod - async def create( - cls, workflow_id: str, client: Client, task_queue: str - ) -> GreetingServiceHandler: - # Start the long-running "entity" workflow, if it is not already running. - await client.start_workflow( - GreetingWorkflow.run, - id=workflow_id, - task_queue=task_queue, - id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, - ) - return cls(workflow_id) - - @property - def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]: - # In nexus operation handler code, nexus.client() is always available, returning a client - # connected to the handler namespace (it's the same client instance that your nexus worker - # is using to poll the server for nexus tasks). This client can be used to interact with the - # handler namespace, for example to send signals, queries, or updates. Remember however, - # that a sync_operation handler must return quickly (no more than a few seconds). To do - # long-running work in a nexus operation handler, use - # temporalio.nexus.workflow_run_operation (see the hello_nexus sample). - return nexus.client().get_workflow_handle_for( - GreetingWorkflow.run, self.workflow_id - ) - - # 👉 This is a handler for a nexus operation whose internal implementation involves executing a - # query against a long-running workflow that is private to the nexus service. - @nexusrpc.handler.sync_operation - async def get_languages( - self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput - ) -> list[Language]: - return await self.greeting_workflow_handle.query( - GreetingWorkflow.get_languages, input - ) - - # 👉 This is a handler for a nexus operation whose internal implementation involves executing a - # query against a long-running workflow that is private to the nexus service. - @nexusrpc.handler.sync_operation - async def get_language( - self, ctx: nexusrpc.handler.StartOperationContext, input: None - ) -> Language: - return await self.greeting_workflow_handle.query(GreetingWorkflow.get_language) - - # 👉 This is a handler for a nexus operation whose internal implementation involves executing an - # update against a long-running workflow that is private to the nexus service. Although updates - # can run for an arbitrarily long time, when exposing an update via a nexus sync operation the - # update should execute quickly (sync operations must complete in under 10s). - @nexusrpc.handler.sync_operation - async def set_language( - self, - ctx: nexusrpc.handler.StartOperationContext, - input: SetLanguageInput, - ) -> Language: - return await self.greeting_workflow_handle.execute_update( - GreetingWorkflow.set_language_using_activity, input - ) diff --git a/nexus_sync_operations/service.py b/nexus_sync_operations/service.py deleted file mode 100644 index 3436d5f3..00000000 --- a/nexus_sync_operations/service.py +++ /dev/null @@ -1,20 +0,0 @@ -""" -This module defines a Nexus service that exposes three operations. - -It is used by the nexus service handler to validate that the operation handlers implement the -correct input and output types, and by the caller workflow to create a type-safe client. It does not -contain the implementation of the operations; see nexus_sync_operations.handler.service_handler for -that. -""" - -import nexusrpc - -from message_passing.introduction import Language -from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput - - -@nexusrpc.service -class GreetingService: - get_languages: nexusrpc.Operation[GetLanguagesInput, list[Language]] - get_language: nexusrpc.Operation[None, Language] - set_language: nexusrpc.Operation[SetLanguageInput, Language] diff --git a/pyproject.toml b/pyproject.toml index 0927eea3..f57bc875 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ pydantic-converter = ["pydantic>=2.10.6,<3"] sentry = ["sentry-sdk>=2.13.0"] trio-async = ["trio>=0.28.0,<0.29", "trio-asyncio>=0.15.0,<0.16"] cloud-export-to-parquet = [ - "pandas>=2.2.2,<3 ; python_version >= '3.10' and python_version < '4.0'", + "pandas>=2.3.3,<3 ; python_version >= '3.10' and python_version < '4.0'", "numpy>=1.26.0,<2 ; python_version >= '3.10' and python_version < '3.13'", "boto3>=1.34.89,<2", "pyarrow>=19.0.1", diff --git a/tests/nexus_sync_operations/nexus_sync_operations_test.py b/tests/nexus_messaging/callerpattern_test.py similarity index 55% rename from tests/nexus_sync_operations/nexus_sync_operations_test.py rename to tests/nexus_messaging/callerpattern_test.py index d74168cb..d14b4c29 100644 --- a/tests/nexus_sync_operations/nexus_sync_operations_test.py +++ b/tests/nexus_messaging/callerpattern_test.py @@ -8,18 +8,18 @@ from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker -import nexus_sync_operations.handler.service_handler -import nexus_sync_operations.handler.worker -from message_passing.introduction import Language -from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput -from nexus_sync_operations.caller.workflows import CallerWorkflow +import nexus_messaging.callerpattern.handler.worker +from nexus_messaging.callerpattern.caller.workflows import CallerWorkflow +from nexus_messaging.callerpattern.service import ( + GetLanguageInput, + GetLanguagesInput, + Language, + NexusGreetingService, + SetLanguageInput, +) from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint -with workflow.unsafe.imports_passed_through(): - from nexus_sync_operations.service import GreetingService - - -NEXUS_ENDPOINT = "nexus-sync-operations-nexus-endpoint" +NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint" @workflow.defn @@ -27,89 +27,90 @@ class TestCallerWorkflow: """Test workflow that calls Nexus operations and makes assertions.""" @workflow.run - async def run(self) -> None: + async def run(self, user_id: str) -> None: nexus_client = workflow.create_nexus_client( - service=GreetingService, + service=NexusGreetingService, endpoint=NEXUS_ENDPOINT, ) supported_languages = await nexus_client.execute_operation( - GreetingService.get_languages, GetLanguagesInput(include_unsupported=False) + NexusGreetingService.get_languages, + GetLanguagesInput(include_unsupported=False, user_id=user_id), ) - assert supported_languages == [Language.CHINESE, Language.ENGLISH] + assert supported_languages.languages == [Language.CHINESE, Language.ENGLISH] initial_language = await nexus_client.execute_operation( - GreetingService.get_language, None + NexusGreetingService.get_language, + GetLanguageInput(user_id=user_id), ) assert initial_language == Language.ENGLISH previous_language = await nexus_client.execute_operation( - GreetingService.set_language, - SetLanguageInput(language=Language.CHINESE), + NexusGreetingService.set_language, + SetLanguageInput(language=Language.CHINESE, user_id=user_id), ) assert previous_language == Language.ENGLISH current_language = await nexus_client.execute_operation( - GreetingService.get_language, None + NexusGreetingService.get_language, + GetLanguageInput(user_id=user_id), ) assert current_language == Language.CHINESE previous_language = await nexus_client.execute_operation( - GreetingService.set_language, - SetLanguageInput(language=Language.ARABIC), + NexusGreetingService.set_language, + SetLanguageInput(language=Language.ARABIC, user_id=user_id), ) assert previous_language == Language.CHINESE current_language = await nexus_client.execute_operation( - GreetingService.get_language, None + NexusGreetingService.get_language, + GetLanguageInput(user_id=user_id), ) assert current_language == Language.ARABIC -async def test_nexus_sync_operations(client: Client, env: WorkflowEnvironment): +async def test_callerpattern(client: Client, env: WorkflowEnvironment): if env.supports_time_skipping: pytest.skip("Nexus tests don't work under the Java test server") await _run_caller_workflow(client, TestCallerWorkflow) -async def test_nexus_sync_operations_caller_workflow( - client: Client, env: WorkflowEnvironment -): - """ - Runs the CallerWorkflow from the sample to ensure it executes without errors. - """ +async def test_callerpattern_caller_workflow(client: Client, env: WorkflowEnvironment): + """Runs the CallerWorkflow from the sample to ensure it executes without errors.""" if env.supports_time_skipping: pytest.skip("Nexus tests don't work under the Java test server") await _run_caller_workflow(client, CallerWorkflow) -async def _run_caller_workflow(client: Client, workflow: Type): +async def _run_caller_workflow(client: Client, wf: Type): create_response = await create_nexus_endpoint( name=NEXUS_ENDPOINT, - task_queue=nexus_sync_operations.handler.worker.TASK_QUEUE, + task_queue=nexus_messaging.callerpattern.handler.worker.TASK_QUEUE, client=client, ) try: handler_worker_task = asyncio.create_task( - nexus_sync_operations.handler.worker.main(client) + nexus_messaging.callerpattern.handler.worker.main(client) ) try: async with Worker( client, task_queue="test-caller-task-queue", - workflows=[workflow], + workflows=[wf], ): await client.execute_workflow( - workflow.run, + wf.run, + arg="user-1", id=str(uuid.uuid4()), task_queue="test-caller-task-queue", ) finally: - nexus_sync_operations.handler.worker.interrupt_event.set() + nexus_messaging.callerpattern.handler.worker.interrupt_event.set() await handler_worker_task - nexus_sync_operations.handler.worker.interrupt_event.clear() + nexus_messaging.callerpattern.handler.worker.interrupt_event.clear() finally: await delete_nexus_endpoint( id=create_response.endpoint.id, diff --git a/tests/nexus_messaging/ondemandpattern_test.py b/tests/nexus_messaging/ondemandpattern_test.py new file mode 100644 index 00000000..75fd761c --- /dev/null +++ b/tests/nexus_messaging/ondemandpattern_test.py @@ -0,0 +1,130 @@ +import asyncio +import uuid +from typing import Type + +import pytest +from temporalio import workflow +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +import nexus_messaging.ondemandpattern.handler.worker +from nexus_messaging.ondemandpattern.caller.workflows import CallerRemoteWorkflow +from nexus_messaging.ondemandpattern.service import ( + ApproveInput, + GetLanguageInput, + GetLanguagesInput, + Language, + NexusRemoteGreetingService, + RunFromRemoteInput, + SetLanguageInput, +) +from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint + +NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint" + + +@workflow.defn +class TestCallerRemoteWorkflow: + """Test workflow that creates remote workflows and makes assertions.""" + + @workflow.run + async def run(self) -> None: + nexus_client = workflow.create_nexus_client( + service=NexusRemoteGreetingService, + endpoint=NEXUS_ENDPOINT, + ) + + workflow_id = f"test-remote-{workflow.uuid4()}" + + # Start a remote workflow. + handle = await nexus_client.start_operation( + NexusRemoteGreetingService.run_from_remote, + RunFromRemoteInput(user_id=workflow_id), + ) + + # Query for supported languages. + languages_output = await nexus_client.execute_operation( + NexusRemoteGreetingService.get_languages, + GetLanguagesInput(include_unsupported=False, user_id=workflow_id), + ) + assert languages_output.languages == [Language.CHINESE, Language.ENGLISH] + + # Check initial language. + initial_language = await nexus_client.execute_operation( + NexusRemoteGreetingService.get_language, + GetLanguageInput(user_id=workflow_id), + ) + assert initial_language == Language.ENGLISH + + # Set language. + previous_language = await nexus_client.execute_operation( + NexusRemoteGreetingService.set_language, + SetLanguageInput(language=Language.ARABIC, user_id=workflow_id), + ) + assert previous_language == Language.ENGLISH + + current_language = await nexus_client.execute_operation( + NexusRemoteGreetingService.get_language, + GetLanguageInput(user_id=workflow_id), + ) + assert current_language == Language.ARABIC + + # Approve and wait for result. + await nexus_client.execute_operation( + NexusRemoteGreetingService.approve, + ApproveInput(name="test", user_id=workflow_id), + ) + + result = await handle + assert "\u0645\u0631\u062d\u0628\u0627" in result # Arabic greeting + + +async def test_ondemandpattern(client: Client, env: WorkflowEnvironment): + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work under the Java test server") + + await _run_caller_workflow(client, TestCallerRemoteWorkflow) + + +async def test_ondemandpattern_caller_workflow( + client: Client, env: WorkflowEnvironment +): + """Runs the CallerRemoteWorkflow from the sample to ensure it executes without errors.""" + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work under the Java test server") + + await _run_caller_workflow(client, CallerRemoteWorkflow) + + +async def _run_caller_workflow(client: Client, wf: Type): + create_response = await create_nexus_endpoint( + name=NEXUS_ENDPOINT, + task_queue=nexus_messaging.ondemandpattern.handler.worker.TASK_QUEUE, + client=client, + ) + try: + handler_worker_task = asyncio.create_task( + nexus_messaging.ondemandpattern.handler.worker.main(client) + ) + try: + async with Worker( + client, + task_queue="test-caller-remote-task-queue", + workflows=[wf], + ): + await client.execute_workflow( + wf.run, + id=str(uuid.uuid4()), + task_queue="test-caller-remote-task-queue", + ) + finally: + nexus_messaging.ondemandpattern.handler.worker.interrupt_event.set() + await handler_worker_task + nexus_messaging.ondemandpattern.handler.worker.interrupt_event.clear() + finally: + await delete_nexus_endpoint( + id=create_response.endpoint.id, + version=create_response.endpoint.version, + client=client, + ) diff --git a/uv.lock b/uv.lock index 3c9990eb..106627c3 100644 --- a/uv.lock +++ b/uv.lock @@ -1556,7 +1556,7 @@ wheels = [ [[package]] name = "pandas" -version = "2.3.1" +version = "2.3.3" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "numpy" }, @@ -1564,42 +1564,55 @@ dependencies = [ { name = "pytz" }, { name = "tzdata" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/d1/6f/75aa71f8a14267117adeeed5d21b204770189c0a0025acbdc03c337b28fc/pandas-2.3.1.tar.gz", hash = "sha256:0a95b9ac964fe83ce317827f80304d37388ea77616b1425f0ae41c9d2d0d7bb2", size = 4487493, upload-time = "2025-07-07T19:20:04.079Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/c4/ca/aa97b47287221fa37a49634532e520300088e290b20d690b21ce3e448143/pandas-2.3.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:22c2e866f7209ebc3a8f08d75766566aae02bcc91d196935a1d9e59c7b990ac9", size = 11542731, upload-time = "2025-07-07T19:18:12.619Z" }, - { url = "https://files.pythonhosted.org/packages/80/bf/7938dddc5f01e18e573dcfb0f1b8c9357d9b5fa6ffdee6e605b92efbdff2/pandas-2.3.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:3583d348546201aff730c8c47e49bc159833f971c2899d6097bce68b9112a4f1", size = 10790031, upload-time = "2025-07-07T19:18:16.611Z" }, - { url = "https://files.pythonhosted.org/packages/ee/2f/9af748366763b2a494fed477f88051dbf06f56053d5c00eba652697e3f94/pandas-2.3.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0f951fbb702dacd390561e0ea45cdd8ecfa7fb56935eb3dd78e306c19104b9b0", size = 11724083, upload-time = "2025-07-07T19:18:20.512Z" }, - { url = "https://files.pythonhosted.org/packages/2c/95/79ab37aa4c25d1e7df953dde407bb9c3e4ae47d154bc0dd1692f3a6dcf8c/pandas-2.3.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cd05b72ec02ebfb993569b4931b2e16fbb4d6ad6ce80224a3ee838387d83a191", size = 12342360, upload-time = "2025-07-07T19:18:23.194Z" }, - { url = "https://files.pythonhosted.org/packages/75/a7/d65e5d8665c12c3c6ff5edd9709d5836ec9b6f80071b7f4a718c6106e86e/pandas-2.3.1-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:1b916a627919a247d865aed068eb65eb91a344b13f5b57ab9f610b7716c92de1", size = 13202098, upload-time = "2025-07-07T19:18:25.558Z" }, - { url = "https://files.pythonhosted.org/packages/65/f3/4c1dbd754dbaa79dbf8b537800cb2fa1a6e534764fef50ab1f7533226c5c/pandas-2.3.1-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:fe67dc676818c186d5a3d5425250e40f179c2a89145df477dd82945eaea89e97", size = 13837228, upload-time = "2025-07-07T19:18:28.344Z" }, - { url = "https://files.pythonhosted.org/packages/3f/d6/d7f5777162aa9b48ec3910bca5a58c9b5927cfd9cfde3aa64322f5ba4b9f/pandas-2.3.1-cp310-cp310-win_amd64.whl", hash = "sha256:2eb789ae0274672acbd3c575b0598d213345660120a257b47b5dafdc618aec83", size = 11336561, upload-time = "2025-07-07T19:18:31.211Z" }, - { url = "https://files.pythonhosted.org/packages/76/1c/ccf70029e927e473a4476c00e0d5b32e623bff27f0402d0a92b7fc29bb9f/pandas-2.3.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:2b0540963d83431f5ce8870ea02a7430adca100cec8a050f0811f8e31035541b", size = 11566608, upload-time = "2025-07-07T19:18:33.86Z" }, - { url = "https://files.pythonhosted.org/packages/ec/d3/3c37cb724d76a841f14b8f5fe57e5e3645207cc67370e4f84717e8bb7657/pandas-2.3.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:fe7317f578c6a153912bd2292f02e40c1d8f253e93c599e82620c7f69755c74f", size = 10823181, upload-time = "2025-07-07T19:18:36.151Z" }, - { url = "https://files.pythonhosted.org/packages/8a/4c/367c98854a1251940edf54a4df0826dcacfb987f9068abf3e3064081a382/pandas-2.3.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e6723a27ad7b244c0c79d8e7007092d7c8f0f11305770e2f4cd778b3ad5f9f85", size = 11793570, upload-time = "2025-07-07T19:18:38.385Z" }, - { url = "https://files.pythonhosted.org/packages/07/5f/63760ff107bcf5146eee41b38b3985f9055e710a72fdd637b791dea3495c/pandas-2.3.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3462c3735fe19f2638f2c3a40bd94ec2dc5ba13abbb032dd2fa1f540a075509d", size = 12378887, upload-time = "2025-07-07T19:18:41.284Z" }, - { url = "https://files.pythonhosted.org/packages/15/53/f31a9b4dfe73fe4711c3a609bd8e60238022f48eacedc257cd13ae9327a7/pandas-2.3.1-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:98bcc8b5bf7afed22cc753a28bc4d9e26e078e777066bc53fac7904ddef9a678", size = 13230957, upload-time = "2025-07-07T19:18:44.187Z" }, - { url = "https://files.pythonhosted.org/packages/e0/94/6fce6bf85b5056d065e0a7933cba2616dcb48596f7ba3c6341ec4bcc529d/pandas-2.3.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:4d544806b485ddf29e52d75b1f559142514e60ef58a832f74fb38e48d757b299", size = 13883883, upload-time = "2025-07-07T19:18:46.498Z" }, - { url = "https://files.pythonhosted.org/packages/c8/7b/bdcb1ed8fccb63d04bdb7635161d0ec26596d92c9d7a6cce964e7876b6c1/pandas-2.3.1-cp311-cp311-win_amd64.whl", hash = "sha256:b3cd4273d3cb3707b6fffd217204c52ed92859533e31dc03b7c5008aa933aaab", size = 11340212, upload-time = "2025-07-07T19:18:49.293Z" }, - { url = "https://files.pythonhosted.org/packages/46/de/b8445e0f5d217a99fe0eeb2f4988070908979bec3587c0633e5428ab596c/pandas-2.3.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:689968e841136f9e542020698ee1c4fbe9caa2ed2213ae2388dc7b81721510d3", size = 11588172, upload-time = "2025-07-07T19:18:52.054Z" }, - { url = "https://files.pythonhosted.org/packages/1e/e0/801cdb3564e65a5ac041ab99ea6f1d802a6c325bb6e58c79c06a3f1cd010/pandas-2.3.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:025e92411c16cbe5bb2a4abc99732a6b132f439b8aab23a59fa593eb00704232", size = 10717365, upload-time = "2025-07-07T19:18:54.785Z" }, - { url = "https://files.pythonhosted.org/packages/51/a5/c76a8311833c24ae61a376dbf360eb1b1c9247a5d9c1e8b356563b31b80c/pandas-2.3.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9b7ff55f31c4fcb3e316e8f7fa194566b286d6ac430afec0d461163312c5841e", size = 11280411, upload-time = "2025-07-07T19:18:57.045Z" }, - { url = "https://files.pythonhosted.org/packages/da/01/e383018feba0a1ead6cf5fe8728e5d767fee02f06a3d800e82c489e5daaf/pandas-2.3.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7dcb79bf373a47d2a40cf7232928eb7540155abbc460925c2c96d2d30b006eb4", size = 11988013, upload-time = "2025-07-07T19:18:59.771Z" }, - { url = "https://files.pythonhosted.org/packages/5b/14/cec7760d7c9507f11c97d64f29022e12a6cc4fc03ac694535e89f88ad2ec/pandas-2.3.1-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:56a342b231e8862c96bdb6ab97170e203ce511f4d0429589c8ede1ee8ece48b8", size = 12767210, upload-time = "2025-07-07T19:19:02.944Z" }, - { url = "https://files.pythonhosted.org/packages/50/b9/6e2d2c6728ed29fb3d4d4d302504fb66f1a543e37eb2e43f352a86365cdf/pandas-2.3.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:ca7ed14832bce68baef331f4d7f294411bed8efd032f8109d690df45e00c4679", size = 13440571, upload-time = "2025-07-07T19:19:06.82Z" }, - { url = "https://files.pythonhosted.org/packages/80/a5/3a92893e7399a691bad7664d977cb5e7c81cf666c81f89ea76ba2bff483d/pandas-2.3.1-cp312-cp312-win_amd64.whl", hash = "sha256:ac942bfd0aca577bef61f2bc8da8147c4ef6879965ef883d8e8d5d2dc3e744b8", size = 10987601, upload-time = "2025-07-07T19:19:09.589Z" }, - { url = "https://files.pythonhosted.org/packages/32/ed/ff0a67a2c5505e1854e6715586ac6693dd860fbf52ef9f81edee200266e7/pandas-2.3.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:9026bd4a80108fac2239294a15ef9003c4ee191a0f64b90f170b40cfb7cf2d22", size = 11531393, upload-time = "2025-07-07T19:19:12.245Z" }, - { url = "https://files.pythonhosted.org/packages/c7/db/d8f24a7cc9fb0972adab0cc80b6817e8bef888cfd0024eeb5a21c0bb5c4a/pandas-2.3.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:6de8547d4fdb12421e2d047a2c446c623ff4c11f47fddb6b9169eb98ffba485a", size = 10668750, upload-time = "2025-07-07T19:19:14.612Z" }, - { url = "https://files.pythonhosted.org/packages/0f/b0/80f6ec783313f1e2356b28b4fd8d2148c378370045da918c73145e6aab50/pandas-2.3.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:782647ddc63c83133b2506912cc6b108140a38a37292102aaa19c81c83db2928", size = 11342004, upload-time = "2025-07-07T19:19:16.857Z" }, - { url = "https://files.pythonhosted.org/packages/e9/e2/20a317688435470872885e7fc8f95109ae9683dec7c50be29b56911515a5/pandas-2.3.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2ba6aff74075311fc88504b1db890187a3cd0f887a5b10f5525f8e2ef55bfdb9", size = 12050869, upload-time = "2025-07-07T19:19:19.265Z" }, - { url = "https://files.pythonhosted.org/packages/55/79/20d746b0a96c67203a5bee5fb4e00ac49c3e8009a39e1f78de264ecc5729/pandas-2.3.1-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:e5635178b387bd2ba4ac040f82bc2ef6e6b500483975c4ebacd34bec945fda12", size = 12750218, upload-time = "2025-07-07T19:19:21.547Z" }, - { url = "https://files.pythonhosted.org/packages/7c/0f/145c8b41e48dbf03dd18fdd7f24f8ba95b8254a97a3379048378f33e7838/pandas-2.3.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:6f3bf5ec947526106399a9e1d26d40ee2b259c66422efdf4de63c848492d91bb", size = 13416763, upload-time = "2025-07-07T19:19:23.939Z" }, - { url = "https://files.pythonhosted.org/packages/b2/c0/54415af59db5cdd86a3d3bf79863e8cc3fa9ed265f0745254061ac09d5f2/pandas-2.3.1-cp313-cp313-win_amd64.whl", hash = "sha256:1c78cf43c8fde236342a1cb2c34bcff89564a7bfed7e474ed2fffa6aed03a956", size = 10987482, upload-time = "2025-07-07T19:19:42.699Z" }, - { url = "https://files.pythonhosted.org/packages/48/64/2fd2e400073a1230e13b8cd604c9bc95d9e3b962e5d44088ead2e8f0cfec/pandas-2.3.1-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:8dfc17328e8da77be3cf9f47509e5637ba8f137148ed0e9b5241e1baf526e20a", size = 12029159, upload-time = "2025-07-07T19:19:26.362Z" }, - { url = "https://files.pythonhosted.org/packages/d8/0a/d84fd79b0293b7ef88c760d7dca69828d867c89b6d9bc52d6a27e4d87316/pandas-2.3.1-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:ec6c851509364c59a5344458ab935e6451b31b818be467eb24b0fe89bd05b6b9", size = 11393287, upload-time = "2025-07-07T19:19:29.157Z" }, - { url = "https://files.pythonhosted.org/packages/50/ae/ff885d2b6e88f3c7520bb74ba319268b42f05d7e583b5dded9837da2723f/pandas-2.3.1-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:911580460fc4884d9b05254b38a6bfadddfcc6aaef856fb5859e7ca202e45275", size = 11309381, upload-time = "2025-07-07T19:19:31.436Z" }, - { url = "https://files.pythonhosted.org/packages/85/86/1fa345fc17caf5d7780d2699985c03dbe186c68fee00b526813939062bb0/pandas-2.3.1-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2f4d6feeba91744872a600e6edbbd5b033005b431d5ae8379abee5bcfa479fab", size = 11883998, upload-time = "2025-07-07T19:19:34.267Z" }, - { url = "https://files.pythonhosted.org/packages/81/aa/e58541a49b5e6310d89474333e994ee57fea97c8aaa8fc7f00b873059bbf/pandas-2.3.1-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:fe37e757f462d31a9cd7580236a82f353f5713a80e059a29753cf938c6775d96", size = 12704705, upload-time = "2025-07-07T19:19:36.856Z" }, - { url = "https://files.pythonhosted.org/packages/d5/f9/07086f5b0f2a19872554abeea7658200824f5835c58a106fa8f2ae96a46c/pandas-2.3.1-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:5db9637dbc24b631ff3707269ae4559bce4b7fd75c1c4d7e13f40edc42df4444", size = 13189044, upload-time = "2025-07-07T19:19:39.999Z" }, +sdist = { url = "https://files.pythonhosted.org/packages/33/01/d40b85317f86cf08d853a4f495195c73815fdf205eef3993821720274518/pandas-2.3.3.tar.gz", hash = "sha256:e05e1af93b977f7eafa636d043f9f94c7ee3ac81af99c13508215942e64c993b", size = 4495223, upload-time = "2025-09-29T23:34:51.853Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3d/f7/f425a00df4fcc22b292c6895c6831c0c8ae1d9fac1e024d16f98a9ce8749/pandas-2.3.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:376c6446ae31770764215a6c937f72d917f214b43560603cd60da6408f183b6c", size = 11555763, upload-time = "2025-09-29T23:16:53.287Z" }, + { url = "https://files.pythonhosted.org/packages/13/4f/66d99628ff8ce7857aca52fed8f0066ce209f96be2fede6cef9f84e8d04f/pandas-2.3.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:e19d192383eab2f4ceb30b412b22ea30690c9e618f78870357ae1d682912015a", size = 10801217, upload-time = "2025-09-29T23:17:04.522Z" }, + { url = "https://files.pythonhosted.org/packages/1d/03/3fc4a529a7710f890a239cc496fc6d50ad4a0995657dccc1d64695adb9f4/pandas-2.3.3-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:5caf26f64126b6c7aec964f74266f435afef1c1b13da3b0636c7518a1fa3e2b1", size = 12148791, upload-time = "2025-09-29T23:17:18.444Z" }, + { url = "https://files.pythonhosted.org/packages/40/a8/4dac1f8f8235e5d25b9955d02ff6f29396191d4e665d71122c3722ca83c5/pandas-2.3.3-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:dd7478f1463441ae4ca7308a70e90b33470fa593429f9d4c578dd00d1fa78838", size = 12769373, upload-time = "2025-09-29T23:17:35.846Z" }, + { url = "https://files.pythonhosted.org/packages/df/91/82cc5169b6b25440a7fc0ef3a694582418d875c8e3ebf796a6d6470aa578/pandas-2.3.3-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:4793891684806ae50d1288c9bae9330293ab4e083ccd1c5e383c34549c6e4250", size = 13200444, upload-time = "2025-09-29T23:17:49.341Z" }, + { url = "https://files.pythonhosted.org/packages/10/ae/89b3283800ab58f7af2952704078555fa60c807fff764395bb57ea0b0dbd/pandas-2.3.3-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:28083c648d9a99a5dd035ec125d42439c6c1c525098c58af0fc38dd1a7a1b3d4", size = 13858459, upload-time = "2025-09-29T23:18:03.722Z" }, + { url = "https://files.pythonhosted.org/packages/85/72/530900610650f54a35a19476eca5104f38555afccda1aa11a92ee14cb21d/pandas-2.3.3-cp310-cp310-win_amd64.whl", hash = "sha256:503cf027cf9940d2ceaa1a93cfb5f8c8c7e6e90720a2850378f0b3f3b1e06826", size = 11346086, upload-time = "2025-09-29T23:18:18.505Z" }, + { url = "https://files.pythonhosted.org/packages/c1/fa/7ac648108144a095b4fb6aa3de1954689f7af60a14cf25583f4960ecb878/pandas-2.3.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:602b8615ebcc4a0c1751e71840428ddebeb142ec02c786e8ad6b1ce3c8dec523", size = 11578790, upload-time = "2025-09-29T23:18:30.065Z" }, + { url = "https://files.pythonhosted.org/packages/9b/35/74442388c6cf008882d4d4bdfc4109be87e9b8b7ccd097ad1e7f006e2e95/pandas-2.3.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:8fe25fc7b623b0ef6b5009149627e34d2a4657e880948ec3c840e9402e5c1b45", size = 10833831, upload-time = "2025-09-29T23:38:56.071Z" }, + { url = "https://files.pythonhosted.org/packages/fe/e4/de154cbfeee13383ad58d23017da99390b91d73f8c11856f2095e813201b/pandas-2.3.3-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b468d3dad6ff947df92dcb32ede5b7bd41a9b3cceef0a30ed925f6d01fb8fa66", size = 12199267, upload-time = "2025-09-29T23:18:41.627Z" }, + { url = "https://files.pythonhosted.org/packages/bf/c9/63f8d545568d9ab91476b1818b4741f521646cbdd151c6efebf40d6de6f7/pandas-2.3.3-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:b98560e98cb334799c0b07ca7967ac361a47326e9b4e5a7dfb5ab2b1c9d35a1b", size = 12789281, upload-time = "2025-09-29T23:18:56.834Z" }, + { url = "https://files.pythonhosted.org/packages/f2/00/a5ac8c7a0e67fd1a6059e40aa08fa1c52cc00709077d2300e210c3ce0322/pandas-2.3.3-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:1d37b5848ba49824e5c30bedb9c830ab9b7751fd049bc7914533e01c65f79791", size = 13240453, upload-time = "2025-09-29T23:19:09.247Z" }, + { url = "https://files.pythonhosted.org/packages/27/4d/5c23a5bc7bd209231618dd9e606ce076272c9bc4f12023a70e03a86b4067/pandas-2.3.3-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:db4301b2d1f926ae677a751eb2bd0e8c5f5319c9cb3f88b0becbbb0b07b34151", size = 13890361, upload-time = "2025-09-29T23:19:25.342Z" }, + { url = "https://files.pythonhosted.org/packages/8e/59/712db1d7040520de7a4965df15b774348980e6df45c129b8c64d0dbe74ef/pandas-2.3.3-cp311-cp311-win_amd64.whl", hash = "sha256:f086f6fe114e19d92014a1966f43a3e62285109afe874f067f5abbdcbb10e59c", size = 11348702, upload-time = "2025-09-29T23:19:38.296Z" }, + { url = "https://files.pythonhosted.org/packages/9c/fb/231d89e8637c808b997d172b18e9d4a4bc7bf31296196c260526055d1ea0/pandas-2.3.3-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:6d21f6d74eb1725c2efaa71a2bfc661a0689579b58e9c0ca58a739ff0b002b53", size = 11597846, upload-time = "2025-09-29T23:19:48.856Z" }, + { url = "https://files.pythonhosted.org/packages/5c/bd/bf8064d9cfa214294356c2d6702b716d3cf3bb24be59287a6a21e24cae6b/pandas-2.3.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:3fd2f887589c7aa868e02632612ba39acb0b8948faf5cc58f0850e165bd46f35", size = 10729618, upload-time = "2025-09-29T23:39:08.659Z" }, + { url = "https://files.pythonhosted.org/packages/57/56/cf2dbe1a3f5271370669475ead12ce77c61726ffd19a35546e31aa8edf4e/pandas-2.3.3-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ecaf1e12bdc03c86ad4a7ea848d66c685cb6851d807a26aa245ca3d2017a1908", size = 11737212, upload-time = "2025-09-29T23:19:59.765Z" }, + { url = "https://files.pythonhosted.org/packages/e5/63/cd7d615331b328e287d8233ba9fdf191a9c2d11b6af0c7a59cfcec23de68/pandas-2.3.3-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:b3d11d2fda7eb164ef27ffc14b4fcab16a80e1ce67e9f57e19ec0afaf715ba89", size = 12362693, upload-time = "2025-09-29T23:20:14.098Z" }, + { url = "https://files.pythonhosted.org/packages/a6/de/8b1895b107277d52f2b42d3a6806e69cfef0d5cf1d0ba343470b9d8e0a04/pandas-2.3.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:a68e15f780eddf2b07d242e17a04aa187a7ee12b40b930bfdd78070556550e98", size = 12771002, upload-time = "2025-09-29T23:20:26.76Z" }, + { url = "https://files.pythonhosted.org/packages/87/21/84072af3187a677c5893b170ba2c8fbe450a6ff911234916da889b698220/pandas-2.3.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:371a4ab48e950033bcf52b6527eccb564f52dc826c02afd9a1bc0ab731bba084", size = 13450971, upload-time = "2025-09-29T23:20:41.344Z" }, + { url = "https://files.pythonhosted.org/packages/86/41/585a168330ff063014880a80d744219dbf1dd7a1c706e75ab3425a987384/pandas-2.3.3-cp312-cp312-win_amd64.whl", hash = "sha256:a16dcec078a01eeef8ee61bf64074b4e524a2a3f4b3be9326420cabe59c4778b", size = 10992722, upload-time = "2025-09-29T23:20:54.139Z" }, + { url = "https://files.pythonhosted.org/packages/cd/4b/18b035ee18f97c1040d94debd8f2e737000ad70ccc8f5513f4eefad75f4b/pandas-2.3.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:56851a737e3470de7fa88e6131f41281ed440d29a9268dcbf0002da5ac366713", size = 11544671, upload-time = "2025-09-29T23:21:05.024Z" }, + { url = "https://files.pythonhosted.org/packages/31/94/72fac03573102779920099bcac1c3b05975c2cb5f01eac609faf34bed1ca/pandas-2.3.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:bdcd9d1167f4885211e401b3036c0c8d9e274eee67ea8d0758a256d60704cfe8", size = 10680807, upload-time = "2025-09-29T23:21:15.979Z" }, + { url = "https://files.pythonhosted.org/packages/16/87/9472cf4a487d848476865321de18cc8c920b8cab98453ab79dbbc98db63a/pandas-2.3.3-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e32e7cc9af0f1cc15548288a51a3b681cc2a219faa838e995f7dc53dbab1062d", size = 11709872, upload-time = "2025-09-29T23:21:27.165Z" }, + { url = "https://files.pythonhosted.org/packages/15/07/284f757f63f8a8d69ed4472bfd85122bd086e637bf4ed09de572d575a693/pandas-2.3.3-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:318d77e0e42a628c04dc56bcef4b40de67918f7041c2b061af1da41dcff670ac", size = 12306371, upload-time = "2025-09-29T23:21:40.532Z" }, + { url = "https://files.pythonhosted.org/packages/33/81/a3afc88fca4aa925804a27d2676d22dcd2031c2ebe08aabd0ae55b9ff282/pandas-2.3.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:4e0a175408804d566144e170d0476b15d78458795bb18f1304fb94160cabf40c", size = 12765333, upload-time = "2025-09-29T23:21:55.77Z" }, + { url = "https://files.pythonhosted.org/packages/8d/0f/b4d4ae743a83742f1153464cf1a8ecfafc3ac59722a0b5c8602310cb7158/pandas-2.3.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:93c2d9ab0fc11822b5eece72ec9587e172f63cff87c00b062f6e37448ced4493", size = 13418120, upload-time = "2025-09-29T23:22:10.109Z" }, + { url = "https://files.pythonhosted.org/packages/4f/c7/e54682c96a895d0c808453269e0b5928a07a127a15704fedb643e9b0a4c8/pandas-2.3.3-cp313-cp313-win_amd64.whl", hash = "sha256:f8bfc0e12dc78f777f323f55c58649591b2cd0c43534e8355c51d3fede5f4dee", size = 10993991, upload-time = "2025-09-29T23:25:04.889Z" }, + { url = "https://files.pythonhosted.org/packages/f9/ca/3f8d4f49740799189e1395812f3bf23b5e8fc7c190827d55a610da72ce55/pandas-2.3.3-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:75ea25f9529fdec2d2e93a42c523962261e567d250b0013b16210e1d40d7c2e5", size = 12048227, upload-time = "2025-09-29T23:22:24.343Z" }, + { url = "https://files.pythonhosted.org/packages/0e/5a/f43efec3e8c0cc92c4663ccad372dbdff72b60bdb56b2749f04aa1d07d7e/pandas-2.3.3-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:74ecdf1d301e812db96a465a525952f4dde225fdb6d8e5a521d47e1f42041e21", size = 11411056, upload-time = "2025-09-29T23:22:37.762Z" }, + { url = "https://files.pythonhosted.org/packages/46/b1/85331edfc591208c9d1a63a06baa67b21d332e63b7a591a5ba42a10bb507/pandas-2.3.3-cp313-cp313t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6435cb949cb34ec11cc9860246ccb2fdc9ecd742c12d3304989017d53f039a78", size = 11645189, upload-time = "2025-09-29T23:22:51.688Z" }, + { url = "https://files.pythonhosted.org/packages/44/23/78d645adc35d94d1ac4f2a3c4112ab6f5b8999f4898b8cdf01252f8df4a9/pandas-2.3.3-cp313-cp313t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:900f47d8f20860de523a1ac881c4c36d65efcb2eb850e6948140fa781736e110", size = 12121912, upload-time = "2025-09-29T23:23:05.042Z" }, + { url = "https://files.pythonhosted.org/packages/53/da/d10013df5e6aaef6b425aa0c32e1fc1f3e431e4bcabd420517dceadce354/pandas-2.3.3-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:a45c765238e2ed7d7c608fc5bc4a6f88b642f2f01e70c0c23d2224dd21829d86", size = 12712160, upload-time = "2025-09-29T23:23:28.57Z" }, + { url = "https://files.pythonhosted.org/packages/bd/17/e756653095a083d8a37cbd816cb87148debcfcd920129b25f99dd8d04271/pandas-2.3.3-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:c4fc4c21971a1a9f4bdb4c73978c7f7256caa3e62b323f70d6cb80db583350bc", size = 13199233, upload-time = "2025-09-29T23:24:24.876Z" }, + { url = "https://files.pythonhosted.org/packages/04/fd/74903979833db8390b73b3a8a7d30d146d710bd32703724dd9083950386f/pandas-2.3.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:ee15f284898e7b246df8087fc82b87b01686f98ee67d85a17b7ab44143a3a9a0", size = 11540635, upload-time = "2025-09-29T23:25:52.486Z" }, + { url = "https://files.pythonhosted.org/packages/21/00/266d6b357ad5e6d3ad55093a7e8efc7dd245f5a842b584db9f30b0f0a287/pandas-2.3.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:1611aedd912e1ff81ff41c745822980c49ce4a7907537be8692c8dbc31924593", size = 10759079, upload-time = "2025-09-29T23:26:33.204Z" }, + { url = "https://files.pythonhosted.org/packages/ca/05/d01ef80a7a3a12b2f8bbf16daba1e17c98a2f039cbc8e2f77a2c5a63d382/pandas-2.3.3-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6d2cefc361461662ac48810cb14365a365ce864afe85ef1f447ff5a1e99ea81c", size = 11814049, upload-time = "2025-09-29T23:27:15.384Z" }, + { url = "https://files.pythonhosted.org/packages/15/b2/0e62f78c0c5ba7e3d2c5945a82456f4fac76c480940f805e0b97fcbc2f65/pandas-2.3.3-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ee67acbbf05014ea6c763beb097e03cd629961c8a632075eeb34247120abcb4b", size = 12332638, upload-time = "2025-09-29T23:27:51.625Z" }, + { url = "https://files.pythonhosted.org/packages/c5/33/dd70400631b62b9b29c3c93d2feee1d0964dc2bae2e5ad7a6c73a7f25325/pandas-2.3.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:c46467899aaa4da076d5abc11084634e2d197e9460643dd455ac3db5856b24d6", size = 12886834, upload-time = "2025-09-29T23:28:21.289Z" }, + { url = "https://files.pythonhosted.org/packages/d3/18/b5d48f55821228d0d2692b34fd5034bb185e854bdb592e9c640f6290e012/pandas-2.3.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:6253c72c6a1d990a410bc7de641d34053364ef8bcd3126f7e7450125887dffe3", size = 13409925, upload-time = "2025-09-29T23:28:58.261Z" }, + { url = "https://files.pythonhosted.org/packages/a6/3d/124ac75fcd0ecc09b8fdccb0246ef65e35b012030defb0e0eba2cbbbe948/pandas-2.3.3-cp314-cp314-win_amd64.whl", hash = "sha256:1b07204a219b3b7350abaae088f451860223a52cfb8a6c53358e7948735158e5", size = 11109071, upload-time = "2025-09-29T23:32:27.484Z" }, + { url = "https://files.pythonhosted.org/packages/89/9c/0e21c895c38a157e0faa1fb64587a9226d6dd46452cac4532d80c3c4a244/pandas-2.3.3-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:2462b1a365b6109d275250baaae7b760fd25c726aaca0054649286bcfbb3e8ec", size = 12048504, upload-time = "2025-09-29T23:29:31.47Z" }, + { url = "https://files.pythonhosted.org/packages/d7/82/b69a1c95df796858777b68fbe6a81d37443a33319761d7c652ce77797475/pandas-2.3.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:0242fe9a49aa8b4d78a4fa03acb397a58833ef6199e9aa40a95f027bb3a1b6e7", size = 11410702, upload-time = "2025-09-29T23:29:54.591Z" }, + { url = "https://files.pythonhosted.org/packages/f9/88/702bde3ba0a94b8c73a0181e05144b10f13f29ebfc2150c3a79062a8195d/pandas-2.3.3-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a21d830e78df0a515db2b3d2f5570610f5e6bd2e27749770e8bb7b524b89b450", size = 11634535, upload-time = "2025-09-29T23:30:21.003Z" }, + { url = "https://files.pythonhosted.org/packages/a4/1e/1bac1a839d12e6a82ec6cb40cda2edde64a2013a66963293696bbf31fbbb/pandas-2.3.3-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2e3ebdb170b5ef78f19bfb71b0dc5dc58775032361fa188e814959b74d726dd5", size = 12121582, upload-time = "2025-09-29T23:30:43.391Z" }, + { url = "https://files.pythonhosted.org/packages/44/91/483de934193e12a3b1d6ae7c8645d083ff88dec75f46e827562f1e4b4da6/pandas-2.3.3-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:d051c0e065b94b7a3cea50eb1ec32e912cd96dba41647eb24104b6c6c14c5788", size = 12699963, upload-time = "2025-09-29T23:31:10.009Z" }, + { url = "https://files.pythonhosted.org/packages/70/44/5191d2e4026f86a2a109053e194d3ba7a31a2d10a9c2348368c63ed4e85a/pandas-2.3.3-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:3869faf4bd07b3b66a9f462417d0ca3a9df29a9f6abd5d0d0dbab15dac7abe87", size = 13202175, upload-time = "2025-09-29T23:31:59.173Z" }, ] [[package]] @@ -2620,7 +2633,7 @@ bedrock = [{ name = "boto3", specifier = ">=1.34.92,<2" }] cloud-export-to-parquet = [ { name = "boto3", specifier = ">=1.34.89,<2" }, { name = "numpy", marker = "python_full_version >= '3.10' and python_full_version < '3.13'", specifier = ">=1.26.0,<2" }, - { name = "pandas", marker = "python_full_version >= '3.10' and python_full_version < '4'", specifier = ">=2.2.2,<3" }, + { name = "pandas", marker = "python_full_version >= '3.10' and python_full_version < '4'", specifier = ">=2.3.3,<3" }, { name = "pyarrow", specifier = ">=19.0.1" }, ] dev = [