Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using Amazon.Lambda.Core;
using Amazon.Lambda.DurableExecution.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace Amazon.Lambda.DurableExecution;

Expand All @@ -17,6 +16,7 @@ internal sealed class DurableContext : IDurableContext
private readonly OperationIdGenerator _idGenerator;
private readonly string _durableExecutionArn;
private readonly CheckpointBatcher? _batcher;
private ILogger _logger;

public DurableContext(
ExecutionState state,
Expand All @@ -32,13 +32,25 @@ public DurableContext(
_durableExecutionArn = durableExecutionArn;
_batcher = batcher;
LambdaContext = lambdaContext;
_logger = new ReplayAwareLogger(new LambdaCoreLogger(), state, modeAware: true);
}

// Replay-safe logger ships in a follow-up PR; see IDurableContext.Logger doc.
public ILogger Logger => NullLogger.Instance;
public ILogger Logger => _logger;
public IExecutionContext ExecutionContext => new DurableExecutionContext(_durableExecutionArn);
public ILambdaContext LambdaContext { get; }

public void ConfigureLogger(LoggerConfig config)
{
if (config == null) throw new ArgumentNullException(nameof(config));

// If the user supplies a CustomLogger, wrap it. Otherwise re-wrap the
// existing inner logger (unwrapping if it was already a ReplayAwareLogger)
// so toggling ModeAware works without losing the previous custom logger.
var inner = config.CustomLogger
?? (_logger is ReplayAwareLogger existing ? existing.Inner : _logger);
_logger = new ReplayAwareLogger(inner, _state, config.ModeAware);
}

public Task<T> StepAsync<T>(
Func<IStepContext, Task<T>> func,
string? name = null,
Expand Down
18 changes: 15 additions & 3 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Amazon.Lambda.DurableExecution.Services;
using Amazon.Lambda.Model;
using Amazon.Runtime;
using Microsoft.Extensions.Logging;

namespace Amazon.Lambda.DurableExecution;

Expand Down Expand Up @@ -111,9 +112,20 @@ private static async Task<DurableExecutionInvocationOutput> WrapAsyncCore<TInput
HandlerResult<TOutput> result;
try
{
result = await DurableExecutionHandler.RunAsync<TOutput>(
state, terminationManager,
async () => await workflow(userPayload, context));
// Push execution-level metadata into a logging scope so structured
// providers (the runtime's JSON formatter, Serilog, Powertools,
// etc.) tag every log line emitted by user code with the
// execution ARN and request id.
using (context.Logger.BeginScope(new Dictionary<string, object>
{
["durableExecutionArn"] = invocationInput.DurableExecutionArn,
["awsRequestId"] = lambdaContext.AwsRequestId ?? string.Empty,
}))
Comment thread
GarrettBeatty marked this conversation as resolved.
{
result = await DurableExecutionHandler.RunAsync<TOutput>(
state, terminationManager,
async () => await workflow(userPayload, context));
}

await batcher.DrainAsync();
}
Expand Down
21 changes: 16 additions & 5 deletions Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@ namespace Amazon.Lambda.DurableExecution;
public interface IDurableContext
{
/// <summary>
/// A logger scoped to the durable execution. Currently returns
/// <see cref="Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance"/>;
/// the replay-safe <c>DurableLogger</c> (suppresses messages during replay)
/// ships in a follow-up PR.
/// Replay-safe logger. Messages emitted while the workflow is re-deriving
/// prior operations from checkpointed state are suppressed by default, so
/// a 30-step workflow re-invoked 30 times still emits each line once.
/// Use this instead of <c>Console.WriteLine</c> or other ambient loggers,
/// which will repeat on every replay. Replace the underlying logger or
/// disable replay-aware filtering via <see cref="ConfigureLogger"/>.
/// </summary>
ILogger Logger { get; }

/// <summary>
/// Swap the underlying logger or toggle replay-aware filtering. Idempotent —
/// later calls overwrite earlier configuration.
/// </summary>
void ConfigureLogger(LoggerConfig config);

/// <summary>
/// Metadata about the current durable execution.
/// </summary>
Expand Down Expand Up @@ -68,7 +76,10 @@ Task WaitAsync(
public interface IStepContext
{
/// <summary>
/// Logger scoped to this step.
/// Logger scoped to this step. Same instance as
/// <see cref="IDurableContext.Logger"/>; emits within an
/// <see cref="ILogger.BeginScope{TState}"/> that carries the step's
/// <c>operationId</c>, <c>operationName</c>, and <c>attempt</c>.
/// </summary>
ILogger Logger { get; }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
using System.Collections.Generic;
using System.Text;
using Microsoft.Extensions.Logging;
using CoreLambdaLogger = Amazon.Lambda.Core.LambdaLogger;

namespace Amazon.Lambda.DurableExecution.Internal;

/// <summary>
/// Default <see cref="ILogger"/> for <see cref="DurableContext"/>. Routes log
/// records through <see cref="CoreLambdaLogger"/> so they flow into the same
/// pipeline used by the rest of the AWS Lambda for .NET runtime — the runtime
/// host installs a redirector that produces structured JSON when
/// <c>AWS_LAMBDA_LOG_FORMAT=JSON</c> and honors <c>AWS_LAMBDA_LOG_LEVEL</c>.
/// </summary>
/// <remarks>
/// In-package adapter to avoid forcing a dependency on
/// <c>Amazon.Lambda.Logging.AspNetCore</c>; users who want a richer experience
/// (Serilog, Powertools, etc.) can swap their own logger via
/// <see cref="IDurableContext.ConfigureLogger"/>.
///
/// When <c>state</c> is the standard <c>FormattedLogValues</c> produced by
/// <see cref="LoggerExtensions"/>, the original template and named arguments
/// are forwarded so the runtime's JSON formatter surfaces named placeholders
/// (<c>{OrderId}</c>) as top-level structured attributes. Mirrors the pattern
/// in <c>Amazon.Lambda.Logging.AspNetCore.LambdaILogger</c>.
///
/// <see cref="BeginScope"/> maintains an <see cref="AsyncLocal{T}"/> chain of
/// scope state. Scopes whose state is a key/value collection have each entry
/// appended to the outgoing template/args, so structured scope metadata
/// (<c>durableExecutionArn</c>, <c>operationId</c>, etc.) shows up as
/// top-level JSON fields without callers having to swap in a third-party
/// logger. Inner scopes win on key collision; explicit message arguments
/// always win over scope keys.
/// </remarks>
internal sealed class LambdaCoreLogger : ILogger
{
private const string OriginalFormatKey = "{OriginalFormat}";

private static readonly AsyncLocal<Scope?> CurrentScope = new();

public IDisposable BeginScope<TState>(TState state) where TState : notnull
{
var scope = new Scope(state, CurrentScope.Value);
CurrentScope.Value = scope;
return scope;
}

// Level filtering is performed by the runtime layer (AWS_LAMBDA_LOG_LEVEL).
public bool IsEnabled(LogLevel logLevel) => logLevel != LogLevel.None;
Comment thread
GarrettBeatty marked this conversation as resolved.

public void Log<TState>(
LogLevel logLevel,
EventId eventId,
TState state,
Exception? exception,
Func<TState, Exception?, string> formatter)
{
if (!IsEnabled(logLevel)) return;

string? messageTemplate = null;
var parameters = new List<object>();
HashSet<string>? claimedKeys = null;

if (state is IEnumerable<KeyValuePair<string, object?>> structure)
{
foreach (var property in structure)
{
if (property is { Key: OriginalFormatKey, Value: string value })
{
messageTemplate = value;
}
else
{
parameters.Add(property.Value!);
claimedKeys ??= new HashSet<string>(StringComparer.Ordinal);
claimedKeys.Add(property.Key);
}
}

// No {OriginalFormat} → not a real FormattedLogValues; ignore the args
// we collected and fall back to the formatter below.
if (messageTemplate == null)
{
parameters.Clear();
claimedKeys = null;
}
}

messageTemplate ??= formatter(state, exception);

AppendScopeAttributes(ref messageTemplate, parameters, ref claimedKeys);

var levelName = logLevel.ToString();
var args = parameters.Count == 0 ? Array.Empty<object>() : parameters.ToArray();
if (exception != null)
{
CoreLambdaLogger.Log(levelName, exception, messageTemplate, args);
}
else
{
CoreLambdaLogger.Log(levelName, messageTemplate, args);
}
}

private static void AppendScopeAttributes(
ref string messageTemplate,
List<object> parameters,
ref HashSet<string>? claimedKeys)
{
var current = CurrentScope.Value;
if (current == null) return;

StringBuilder? sb = null;

// Walk innermost → outermost so the first key seen for a given name wins
// (mirrors how Microsoft.Extensions.Logging structured providers resolve
// overlapping scope keys: the closest scope dominates).
for (var s = current; s != null; s = s.Parent)
{
if (s.State is not IEnumerable<KeyValuePair<string, object?>> kvps) continue;
foreach (var kvp in kvps)
{
// Skip {OriginalFormat} (some scope-state factories emit one).
if (kvp.Key == OriginalFormatKey) continue;

claimedKeys ??= new HashSet<string>(StringComparer.Ordinal);
if (!claimedKeys.Add(kvp.Key)) continue;

sb ??= new StringBuilder(messageTemplate);
sb.Append(' ').Append('{').Append(kvp.Key).Append('}');
parameters.Add(kvp.Value!);
}
}

if (sb != null) messageTemplate = sb.ToString();
}

private sealed class Scope : IDisposable
{
public object State { get; }
public Scope? Parent { get; }
private bool _disposed;

public Scope(object state, Scope? parent)
{
State = state;
Parent = parent;
}

public void Dispose()
{
if (_disposed) return;
_disposed = true;

// Restore the parent. Out-of-order disposal would desync the chain,
// but that violates the using-statement contract that callers rely
// on; we don't try to defend against it.
CurrentScope.Value = Parent;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using Microsoft.Extensions.Logging;

namespace Amazon.Lambda.DurableExecution.Internal;

/// <summary>
/// <see cref="ILogger"/> decorator that suppresses messages while the workflow
/// is replaying prior operations. Reads <see cref="ExecutionState.IsReplaying"/>
/// on every call so it correctly transitions to passthrough the moment the
/// state's per-operation tracker decides we've caught up to fresh execution.
/// </summary>
/// <remarks>
/// Mirrors the suppression behavior of the Python and Java durable execution
/// SDKs: replay <see cref="Log{TState}"/> calls return without invoking the
/// inner logger. <see cref="BeginScope{TState}"/> always delegates so scopes
/// stay balanced — suppression only applies at log emission.
/// </remarks>
internal sealed class ReplayAwareLogger : ILogger
{
private readonly ILogger _inner;
private readonly ExecutionState _state;
private readonly bool _modeAware;

public ReplayAwareLogger(ILogger inner, ExecutionState state, bool modeAware)
{
_inner = inner;
_state = state;
_modeAware = modeAware;
}

/// <summary>The wrapped logger; exposed so <c>ConfigureLogger</c> can rewrap without losing it.</summary>
public ILogger Inner => _inner;

/// <summary>Whether replay suppression is active.</summary>
public bool ModeAware => _modeAware;

public IDisposable? BeginScope<TState>(TState state) where TState : notnull
=> _inner.BeginScope(state);

public bool IsEnabled(LogLevel logLevel)
{
if (ShouldSuppress()) return false;
return _inner.IsEnabled(logLevel);
}

public void Log<TState>(
LogLevel logLevel,
EventId eventId,
TState state,
Exception? exception,
Func<TState, Exception?, string> formatter)
{
if (ShouldSuppress()) return;
_inner.Log(logLevel, eventId, state, exception, formatter);
}

private bool ShouldSuppress() => _modeAware && _state.IsReplaying;
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,21 @@ private async Task<T> ExecuteFunc(int attemptNumber, CancellationToken cancellat
try
{
var stepContext = new StepContext(OperationId, attemptNumber, _logger);
var result = await _func(stepContext);

// Step-scoped metadata so structured log providers tag user code
// lines with the operation id, name, and current attempt. Wrap
// only the user-func call — checkpoint emission shouldn't carry
// step metadata into any side-channel logging.
T result;
using (_logger.BeginScope(new Dictionary<string, object>
{
["operationId"] = OperationId,
["operationName"] = Name ?? string.Empty,
["attempt"] = attemptNumber,
Comment thread
GarrettBeatty marked this conversation as resolved.
}))
{
result = await _func(stepContext);
}

await EnqueueAsync(new SdkOperationUpdate
{
Expand Down
24 changes: 24 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/LoggerConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using Microsoft.Extensions.Logging;

namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Configuration for <see cref="IDurableContext.ConfigureLogger"/>. Lets users
/// swap the underlying <see cref="ILogger"/> (e.g. Serilog, AWS Lambda Powertools)
/// or disable replay-aware filtering for debugging.
/// </summary>
public sealed class LoggerConfig
{
/// <summary>
/// Optional <see cref="ILogger"/> to use instead of the SDK default. When
/// null, the durable context keeps its existing inner logger.
/// </summary>
public ILogger? CustomLogger { get; init; }

/// <summary>
/// When true (default), messages are suppressed while the workflow is
/// re-deriving prior operations from checkpointed state. Set to false to
/// see every log line on every replay (useful for local debugging).
/// </summary>
public bool ModeAware { get; init; } = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<PackageReference Include="AWSSDK.ECR" Version="4.0.7" />
<PackageReference Include="AWSSDK.Lambda" Version="4.0.13.1" />
<PackageReference Include="AWSSDK.SecurityToken" Version="4.0.6.3" />
<PackageReference Include="AWSSDK.CloudWatchLogs" Version="4.0.20" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="18.5.1" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3" />
Expand Down
Loading
Loading