Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 97 additions & 2 deletions Docs/durable-execution-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,31 @@ await context.WaitAsync(TimeSpan.FromDays(7), name: "weekly_reminder");

> **Validation:** The duration must be at least 1 second. Values less than 1 second throw `ArgumentOutOfRangeException`. Sub-second precision is truncated to whole seconds (the underlying service operates at second granularity).

#### Wait For Condition

`WaitForConditionAsync` polls a user-supplied check function until a configured `IWaitStrategy<TState>` decides to stop. Between polls the workflow is suspended (no compute charge); the service re-invokes when the strategy's chosen delay elapses. The check function receives the state from the previous iteration, so users can carry per-poll bookkeeping inside the state itself.

```csharp
// Poll until an order's status reaches a terminal value.
var finalStatus = await context.WaitForConditionAsync<OrderStatus>(
check: async (state, ctx) =>
{
ctx.Logger.LogInformation("Polling order on attempt {Attempt}", ctx.AttemptNumber);
return await orderService.GetStatusAsync(orderId);
},
config: new WaitForConditionConfig<OrderStatus>
{
InitialState = OrderStatus.Unknown,
WaitStrategy = WaitStrategy.Exponential<OrderStatus>(
isDone: s => s == OrderStatus.Completed || s == OrderStatus.Cancelled)
},
name: "wait_for_order_settle");
```

Built-in strategies live on the `WaitStrategy` factory (`Exponential`, `Linear`, `Fixed`, plus `FromDelegate`) and all accept an optional `isDone` predicate so the common case stays declarative. When the strategy hits its `maxAttempts` limit it throws `WaitForConditionException` (carrying `AttemptsExhausted` and `LastState`); when the check function itself throws, the operation surfaces a `StepException` with the original error type. State is checkpointed per-iteration in the operation's payload so polling survives Lambda re-invocations deterministically.

> **Cross-SDK note (Python migration):** .NET (and Java + JS) treat `maxAttempts` exhaustion as a failure — the operation throws `WaitForConditionException` with the last observed state attached. **Python** instead returns `WaitDecision.no_wait()` from its built-in strategies, so a Python workflow at max-attempts *succeeds* with the last state as its result. The .NET behavior was chosen to match the majority of SDKs and to give callers an idiomatic typed-exception path; if you are porting a workflow from Python and want the "succeed-with-last-state" semantic, write a custom `IWaitStrategy<TState>` (or a `WaitStrategy.FromDelegate(...)` lambda) that returns `WaitDecision.Stop()` instead of throwing when the attempt counter is exhausted.

---

### Callbacks
Expand Down Expand Up @@ -1154,7 +1179,13 @@ public interface IDurableContext
CancellationToken cancellationToken = default);

/// <summary>
/// Poll until a condition is met.
/// Poll until a condition is met. The check function returns the next
/// state on each invocation; the configured <c>IWaitStrategy&lt;TState&gt;</c>
/// decides whether to keep polling and how long to wait between calls.
/// Per-iteration state is serialized via the <c>ILambdaSerializer</c>
/// registered on <c>ILambdaContext.Serializer</c> (typically configured via
/// <c>LambdaBootstrapBuilder.Create(handler, serializer)</c>); AOT and
/// reflection-based callers share this single overload.
/// </summary>
Task<TState> WaitForConditionAsync<TState>(
Func<TState, IConditionCheckContext, Task<TState>> check,
Expand Down Expand Up @@ -1208,6 +1239,63 @@ public interface IWaitForCallbackContext
/// traces and can be inspected by name in the test runner.
/// </summary>
public record DurableBranch<T>(string Name, Func<IDurableContext, Task<T>> Func);

/// <summary>
/// Context passed to a WaitForCondition check function on every polling
/// iteration. Mirrors IStepContext minus OperationId (every iteration of a
/// wait-for-condition operation shares the same operation ID, so exposing
/// it here would be misleading).
/// </summary>
public interface IConditionCheckContext
{
/// <summary>Logger scoped to this condition-check attempt.</summary>
ILogger Logger { get; }

/// <summary>The current 1-based attempt number.</summary>
int AttemptNumber { get; }
}

/// <summary>
/// Decides, per polling iteration, whether a WaitForConditionAsync operation
/// should keep polling and how long to wait. Implementations are typically
/// obtained via the <c>WaitStrategy</c> factory; users may also implement
/// directly. Built-in implementations throw <c>WaitForConditionException</c>
/// when their max-attempts limit is reached so the operation can produce a
/// failure with the last observed state.
/// </summary>
public interface IWaitStrategy<TState>
{
WaitDecision Decide(TState state, int attemptNumber);
}

/// <summary>
/// Decision returned by IWaitStrategy on each polling iteration. Stop()
/// indicates the condition has been met (the operation SUCCEEDs and returns
/// the latest state); ContinueAfter(delay) schedules the next poll.
/// </summary>
public readonly record struct WaitDecision
{
public bool ShouldContinue { get; }
public TimeSpan Delay { get; }
public static WaitDecision Stop();
public static WaitDecision ContinueAfter(TimeSpan delay);
}

/// <summary>
/// Factory for built-in IWaitStrategy implementations. Each accepts an
/// optional isDone predicate so users can terminate polling declaratively
/// when the latest state satisfies a condition (e.g. state =&gt; state.IsReady)
/// without implementing IWaitStrategy themselves. Defaults are intentionally
/// tuned for polling, NOT retry-on-exception: 60 attempts / 5s initial /
/// 300s max / 1.5x backoff / Full jitter (matches Python+JS+Java SDKs).
/// </summary>
public static class WaitStrategy
{
public static IWaitStrategy<TState> Exponential<TState>(...);
public static IWaitStrategy<TState> Linear<TState>(...);
public static IWaitStrategy<TState> Fixed<TState>(TimeSpan delay, ...);
public static IWaitStrategy<TState> FromDelegate<TState>(Func<TState, int, WaitDecision> strategy);
}
```

#### CancellationToken behavior
Expand Down Expand Up @@ -1646,11 +1734,18 @@ public class ChildContextException : DurableExecutionException

/// <summary>
/// Thrown when a wait-for-condition operation exhausts all attempts
/// without the condition being met.
/// without the condition being met. Subclassable: future failure modes
/// (e.g. timeout) should add derived exceptions rather than discriminator
/// flags so callers can catch by static type.
/// </summary>
public class WaitForConditionException : DurableExecutionException
{
public int AttemptsExhausted { get; }

/// <summary>The most recent state observed by the check function before
/// the strategy gave up. Boxed because the exception type is not generic;
/// callers cast to the workflow's known state type.</summary>
public object? LastState { get; }
}

/// <summary>
Expand Down
19 changes: 19 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,25 @@ public async Task RunInChildContextAsync(
name, config, cancellationToken);
}

public Task<TState> WaitForConditionAsync<TState>(
Func<TState, IConditionCheckContext, Task<TState>> check,
WaitForConditionConfig<TState> config,
string? name = null,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(check);
ArgumentNullException.ThrowIfNull(config);
ArgumentNullException.ThrowIfNull(config.WaitStrategy);

var serializer = LambdaSerializerHelper.GetRequired(LambdaContext);

var operationId = _idGenerator.NextId();
var op = new WaitForConditionOperation<TState>(
operationId, name, _idGenerator.ParentId, check, config, serializer, Logger,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

private Task<T> RunChildContext<T>(
Func<IDurableContext, Task<T>> func,
string? name,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Microsoft.Extensions.Logging;

namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Context passed to a <c>WaitForConditionAsync</c> check function on every
/// polling iteration. Provides a logger scoped to the current attempt and the
/// 1-based attempt number, mirroring the surface of
/// <see cref="IStepContext"/> (minus <c>OperationId</c>: every iteration of a
/// wait-for-condition operation shares the same operation ID, so exposing it
/// here would be misleading — see <c>DESIGN-QUESTIONS.md#Q6</c>).
/// </summary>
public interface IConditionCheckContext
{
/// <summary>
/// Logger scoped to this condition-check attempt.
/// </summary>
ILogger Logger { get; }

/// <summary>
/// The current 1-based attempt number. Increments on every polling
/// iteration; on replay, equals the number of attempts already
/// checkpointed plus one.
/// </summary>
int AttemptNumber { get; }
}
29 changes: 29 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,35 @@ Task<T> WaitForCallbackAsync<T>(
string? name = null,
WaitForCallbackConfig? config = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Poll a condition by repeatedly invoking <paramref name="check"/> until
/// the configured <see cref="IWaitStrategy{TState}"/> decides to stop.
/// Between polls the workflow is suspended (no compute charge); the
/// service re-invokes the Lambda when the strategy's chosen delay elapses.
/// </summary>
/// <remarks>
/// On every iteration the <paramref name="check"/> function receives the
/// state returned by the previous invocation (seeded by
/// <see cref="WaitForConditionConfig{TState}.InitialState"/> on the very
/// first call), so users can carry per-poll bookkeeping (e.g. a cursor or
/// retry counter) inside the state itself. If the strategy stops because
/// of <see cref="IWaitStrategy{TState}"/>'s max-attempts limit (rather
/// than because the condition is met), a <see cref="WaitForConditionException"/>
/// is thrown carrying the last observed state.
/// The check function's return value is serialized to a checkpoint using the
/// <see cref="ILambdaSerializer"/> registered on
/// <see cref="ILambdaContext.Serializer"/> (typically configured via
/// <c>LambdaBootstrapBuilder.Create(handler, serializer)</c>). AOT and
/// reflection-based scenarios share this single overload — the AOT story is
/// determined by the registered serializer (e.g.,
/// <c>SourceGeneratorLambdaJsonSerializer&lt;TContext&gt;</c>).
/// </remarks>
Task<TState> WaitForConditionAsync<TState>(
Func<TState, IConditionCheckContext, Task<TState>> check,
WaitForConditionConfig<TState> config,
string? name = null,
CancellationToken cancellationToken = default);
}

/// <summary>
Expand Down
27 changes: 27 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/IWaitStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Decides, per polling iteration, whether a <c>WaitForConditionAsync</c>
/// operation should keep polling and how long to wait before the next attempt.
/// </summary>
/// <remarks>
/// Distinct from <see cref="IRetryStrategy"/>: that interface decides
/// retry-on-exception (input is the thrown <see cref="Exception"/>); this one
/// decides poll-until-condition (input is the latest <typeparamref name="TState"/>
/// observed by the check function). Implementations are typically obtained
/// via the <see cref="WaitStrategy"/> factory; users who need richer logic
/// (e.g. wall-clock-time budgets, conditional jitter) can implement this
/// interface directly.
/// </remarks>
/// <typeparam name="TState">The state type produced by the check function.</typeparam>
public interface IWaitStrategy<TState>
{
/// <summary>
/// Evaluates the latest <paramref name="state"/> from the check function
/// and the 1-based <paramref name="attemptNumber"/> just executed, and
/// returns either <see cref="WaitDecision.Stop"/> (terminate) or
/// <see cref="WaitDecision.ContinueAfter(TimeSpan)"/> (poll again after
/// the given delay).
/// </summary>
WaitDecision Decide(TState state, int attemptNumber);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace Amazon.Lambda.DurableExecution.Internal;

/// <summary>
/// Shared exponential-backoff math for both
/// <see cref="ExponentialRetryStrategy"/> (retry-on-exception) and
/// <c>ExponentialWaitStrategy&lt;TState&gt;</c> (wait-for-condition polling).
/// Computes <c>min(initialDelay * backoff^(attempt-1), maxDelay)</c>, applies
/// the requested jitter, then ceilings to whole seconds with a 1-second floor
/// (the service timer's smallest unit).
/// </summary>
internal static class ExponentialBackoff
{
[ThreadStatic]
private static Random? t_random;
private static Random Random => t_random ??= new Random();

/// <summary>
/// Computes the delay for the given <paramref name="attemptNumber"/> (1-based)
/// using exponential backoff with the requested jitter strategy. Returned
/// delay is always at least 1 second (service timer floor).
/// </summary>
public static TimeSpan CalculateDelay(
int attemptNumber,
TimeSpan initialDelay,
TimeSpan maxDelay,
double backoffRate,
JitterStrategy jitter)
{
var baseDelay = initialDelay.TotalSeconds * Math.Pow(backoffRate, attemptNumber - 1);
var cappedDelay = Math.Min(baseDelay, maxDelay.TotalSeconds);

var finalDelay = jitter switch
{
JitterStrategy.Full => Random.NextDouble() * cappedDelay,
JitterStrategy.Half => cappedDelay * (0.5 + 0.5 * Random.NextDouble()),
_ => cappedDelay
};

return TimeSpan.FromSeconds(Math.Max(1, Math.Ceiling(finalDelay)));
}
}
Loading
Loading