Conversation
# Conflicts: # tests/api/test_grpc_stub.py
| """Base converter to/from single payload/value.""" | ||
|
|
||
| @abstractmethod | ||
| async def encode(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]: |
There was a problem hiding this comment.
Not convinced this needs to be an async method, it shouldn't do any IO or CPU intensive operations.
The DataConverter should be async because it's responsible for compression and encryption.
There was a problem hiding this comment.
Y'all chose to make it async-capable in TypeScript, why not here? What are the concerns with making it async?
There was a problem hiding this comment.
I think it needs to be async unless we plan to separate out bytes-only stuff soon, as that easily could be cpu heavy for encryption (or need to do IO by reading keys from some source, etc)
There was a problem hiding this comment.
We're going to make the TS PayloadConverter synchronous.
In TS, async framework code could change the order of workflow execution which could break determinism but that might not apply here, and we'd need to restructure things a bit for that.
I would suggest that users don't do any CPU or IO in the PayloadConverter and instead do it in the higher level DataConverter where multiple payloads could be compressed together.
There was a problem hiding this comment.
We're going to make the TS PayloadConverter synchronous.
Why? What are the concerns with making it async?
In TS, async framework code could change the order of workflow execution which could break determinism but that might not apply here, and we'd need to restructure things a bit for that.
Why is this a problem for payload converters but not data converters?
I would suggest that users don't do any CPU or IO in the PayloadConverter and instead do it in the higher level DataConverter where multiple payloads could be compressed together.
While we might suggest this, the question is why is it ok for data converters to be async but not payload converters?
There was a problem hiding this comment.
In TS we're going to separate conversion into 2 steps, one sync that maps object -> payload and runs in workflow context, and the other async that maps payloads -> payloads and runs outside of workflow context.
There was a problem hiding this comment.
I see. I think if we were using subprocess sandboxing or similar we might need to do the same (and we'd also have to take the class of a data converter instead of an instance of it), but I am hoping our sandboxing does not cross some boundary requiring serialization to occur on the workflow side of that boundary. Though a method on a dataclass could provide a way to escape the sandbox. Will think on it.
Even if it does, I think the async payload conversion in the asyncio event loop will be in a predictable order.
There was a problem hiding this comment.
The order is predictable but if you change the workflow data converter it's essentially a breaking non-deterministic change.
There was a problem hiding this comment.
Here I don't expect the data converter to be inline with the workflow wrt determinism, nor am I more concerned about this change of a data converter mid-use any more than other types of data converter changes mid-use.
| await assert_payload(False, "json/plain", "false") | ||
|
|
||
| # Unknown type | ||
| with pytest.raises(RuntimeError) as excinfo: |
There was a problem hiding this comment.
I think TypeError is better here.
There was a problem hiding this comment.
Not sure it's a type error necessarily vs lack of converter. I have now documented the error.
…date python version in CI
| # TODO(cretz): Should this be a var that can be changed instead? If so, can it | ||
| # be replaced _after_ client creation? We'd just have to fallback to this | ||
| # default at conversion time instead of instantiation time. |
There was a problem hiding this comment.
If we do that, IMO it'd make sense to keep this and also introduce some kind of set_default so that you don't wipe out the original defaults and they're always easily available if you want to restore them for some reason.
There was a problem hiding this comment.
Yeah, I think I'd like to keep it an immutable default. We have ways to set the default at the client level which then applies to all things within anyways. You don't need to set the "default default" I don't think (we don't allow it in Go).
There was a problem hiding this comment.
The other SDKs have an immutable default converter, but I see Spencer's point about setting the "default default" being more convenient than having to pass data converters everywhere.
There was a problem hiding this comment.
👍 Keeping it immutable here. I have exposed converters from the default for easy creation of a new one.
| Args: | ||
| values: Values to be converted. | ||
|
|
||
| Returns: | ||
| Converted payloads. Note, this does not have to be the same number | ||
| as values given, but at least one must be present. | ||
|
|
||
| Raises: | ||
| Exception: Any issue during conversion. |
There was a problem hiding this comment.
My rationale is that's what pycharm defaults to and from my experience is the most popular format.
I don't have a strong preference, my request was based on personal experience.
| """Base converter to/from single payload/value.""" | ||
|
|
||
| @abstractmethod | ||
| async def encode(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]: |
There was a problem hiding this comment.
We're going to make the TS PayloadConverter synchronous.
In TS, async framework code could change the order of workflow execution which could break determinism but that might not apply here, and we'd need to restructure things a bit for that.
I would suggest that users don't do any CPU or IO in the PayloadConverter and instead do it in the higher level DataConverter where multiple payloads could be compressed together.
| """See base class.""" | ||
| message_type = payload.metadata.get("messageType", b"<unknown>").decode() | ||
| try: | ||
| value = _sym_db.GetSymbol(message_type)() |
There was a problem hiding this comment.
Since we have the type hint here I recommend using that and falling back to the messageType header.
There was a problem hiding this comment.
The type hint is optional for proto payloads, the message type is not (at least how we're developing this across SDKs), so we might as well use the thing we know is always there.
There was a problem hiding this comment.
I know Dmitry had his concerns with breaking compatibility if the message name changes.
There was a problem hiding this comment.
If we're concerned with message name changes in the payload, he should also be concerned with incompatible type hint changes. As it stands, there are changes a dev can make that can break things.
| # TODO(cretz): Should this be a var that can be changed instead? If so, can it | ||
| # be replaced _after_ client creation? We'd just have to fallback to this | ||
| # default at conversion time instead of instantiation time. |
There was a problem hiding this comment.
The other SDKs have an immutable default converter, but I see Spencer's point about setting the "default default" being more convenient than having to pass data converters everywhere.
|
Gonna go ahead and merge this given a lack of large outstanding concerns and with an impending review for the client coming. |
- Convert Pydantic models to dataclasses in _models.py (item temporalio#3) - Standardize type annotations: replace Optional[X] with X | None (item temporalio#9) - Verify docstring style follows SDK conventions (item temporalio#4)
Reconciles DESIGN-v2.md with the "Streaming API Design Considerations" Notion page so both track the authoritative Python implementation. The Notion page had richer narrative (durable-streams framing, pull-vs-push reasoning, one-way-door callouts, offset-options comparison table, alternatives-considered list for wire evolution, end-to-end-principle writeup). This change brings that into the in-repo doc. Changes: - New top-of-doc note establishing that the Python code in sdk-python/temporalio/contrib/pubsub/ is authoritative; both DESIGN-v2.md and the Notion page track it. - New Decision #1 "Durable streams" explaining the durable-by-default choice vs ephemeral streams (simpler model, reliability, correctness). Existing decisions renumbered. - Decision #4 (Global offsets) gains the 6-option ecosystem comparison table and a one-way-door callout flagging the wire-protocol commitment. - Decision #9 (Subscription is poll-based) expanded with the pull-vs-push trade-off (back-pressure, subscriber-controlled read position, data-at-rest) and explicit "both layers are exposed" framing. - New "Design Principles" section with the Saltzer/Reed/Clark end-to-end-dedup framing and the "retries remain in the log" contract, with a one-way-door callout on the append-only-of-attempts contract. - Compatibility section gains a full alternatives-considered list (version field, versioned handler names, protocol negotiation, SDK version embedding, accepting silent incompatibility) and a two-part one-way-door callout on immutable handler names + no version field. - New "Ecosystem analogs" section: a compact one-paragraph summary (NATS JetStream for offsets, Kafka for idempotent producers, Redis for blocking pull, Workflow SDK as the durable-execution peer) with a pointer to the Notion page for the full comparison tables. The Notion page itself is still behind on the Payload migration (Decision #5 "Opaque message payloads" needs rewriting, API signatures still show priority= and data: bytes). That update is deferred pending resolution of an open reviewer discussion on activity-retry/dedup (discussion 34a8fc56-7738-808c-b29b-001c5066e9d2) whose substance overlaps with the Decision #5 rewrite. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
What was changed
Added initial impl of data converters. Also added some scaffold for the bridge and client, but both are likely to change quite a bit, so no need to review them too hard. Just trying to get eyes before moving on.