Skip to content

Sync activities in ProcessPoolExecutor crash with PickleError when retry_policy has non_retryable_error_types #1350

@lambyqq

Description

@lambyqq

Bug Report

Description

RetryPolicy.from_proto() assigns proto.non_retryable_error_types directly as a protobuf RepeatedScalarContainer instead of converting it to a Python list. This makes activity.info().retry_policy non-picklable, which crashes any sync activity running in a ProcessPoolExecutor when the workflow specifies non_retryable_error_types in the retry policy.

Root Cause

In temporalio/common.py, RetryPolicy.from_proto() does:

non_retryable_error_types=proto.non_retryable_error_types or None,

proto.non_retryable_error_types is a google._upb._message.RepeatedScalarContainer, which is not picklable. It should be:

non_retryable_error_types=list(proto.non_retryable_error_types) or None,

Introduced

v1.18.0 (PR #1055 "Added retry policy to activity info"), which populates activity.info().retry_policy via RetryPolicy.from_proto(). The from_proto() method itself has had this issue since it was written, but it only became user-facing when activity.info() started including the retry policy — because activity.info() is pickled when sent to subprocess workers.

Version Matrix

SDK Version Result
1.8.0 PASS (activity.Info has no retry_policy field)
1.17.0 PASS (same)
1.18.0 FAIL_pickle.PickleError: can't pickle repeated message fields, convert to list first
1.23.0 FAIL — same error

Minimal Reproduction

Requires only pip install temporalio. No external Temporal server needed (uses built-in test server).

"""Repro: sync activity in ProcessPoolExecutor crashes when
retry_policy has non_retryable_error_types."""
import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from datetime import timedelta

from temporalio import activity, workflow
from temporalio.common import RetryPolicy
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import SharedStateManager, Worker


@activity.defn
def my_sync_activity() -> str:
    return "ok"


@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self) -> str:
        return await workflow.execute_activity(
            my_sync_activity,
            start_to_close_timeout=timedelta(seconds=10),
            retry_policy=RetryPolicy(
                maximum_attempts=1,
                non_retryable_error_types=["SomeError"],
            ),
        )


async def main():
    async with await WorkflowEnvironment.start_local() as env:
        with ProcessPoolExecutor(
            max_workers=1,
            mp_context=multiprocessing.get_context("spawn"),
        ) as executor:
            mgr = SharedStateManager.create_from_multiprocessing(
                multiprocessing.Manager())
            async with Worker(
                env.client,
                task_queue="repro-queue",
                workflows=[MyWorkflow],
                activities=[my_sync_activity],
                activity_executor=executor,
                shared_state_manager=mgr,
            ):
                result = await env.client.execute_workflow(
                    MyWorkflow.run,
                    id="repro-pickle-bug",
                    task_queue="repro-queue",
                    task_timeout=timedelta(seconds=15),
                )
                print(f"Result: {result}")


if __name__ == "__main__":
    asyncio.run(main())

Error

_pickle.PickleError: can't pickle repeated message fields, convert to list first

Full traceback:

concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File ".../multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File ".../multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PickleError: can't pickle repeated message fields, convert to list first
"""

Suggested Fix

In RetryPolicy.from_proto(), wrap with list():

non_retryable_error_types=list(proto.non_retryable_error_types) or None,

Workaround

Monkey-patch RetryPolicy.from_proto at import time:

from temporalio.common import RetryPolicy

_original_from_proto = RetryPolicy.from_proto

@staticmethod
def _patched_from_proto(proto) -> RetryPolicy:
    rp = _original_from_proto(proto)
    if rp.non_retryable_error_types is not None:
        rp.non_retryable_error_types = list(rp.non_retryable_error_types)
    return rp

RetryPolicy.from_proto = _patched_from_proto

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions