Add OpenTelemetry observability to custom background tasks#812
Add OpenTelemetry observability to custom background tasks#8122chanhaeng wants to merge 3 commits into
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: ASSERTIVE Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a custom background task API to Fedify, allowing developers to define, enqueue, and process arbitrary background jobs with type-safe payload validation via Standard Schema. The implementation supports robust serialization of complex types and Activity Vocabulary objects using devalue, customizable retry policies, queue routing, best-effort or native deduplication, and OpenTelemetry instrumentation. Feedback on the changes highlights a compatibility issue with Node.js 20 due to the use of Array.fromAsync in codec.ts, suggesting standard for...of loops instead, and recommends implementing a recursion depth limit during deserialization to prevent potential Denial of Service (DoS) attacks from deeply nested payloads.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
Codecov Report✅ All modified and coverable lines are covered by tests.
... and 1 file with indirect coverage changes 🚀 New features to boost your workflow:
|
Context.enqueueTask() and enqueueTaskMany() now accept a
deduplicationKey requesting at-most-once enqueue for tasks that share
it (new TaskEnqueueOptions.deduplicationKey).
Resolution follows the queue and key-value store capabilities:
- A queue declaring the new MessageQueue.nativeDeduplication owns the
check; the key is forwarded through the new
MessageQueueEnqueueOptions.deduplicationKey.
- Otherwise Fedify applies a best-effort guard through the optional
KvStore.cas primitive under a new taskDeduplication key prefix,
tunable with the new FederationOptions.taskDeduplicationTtl and
taskDeduplicationFallback options.
For enqueueTaskMany(), a single key governs the whole batch. A native
queue that does not implement enqueueMany() cannot express batch-level
at-most-once with a per-message key, so such a multi-item enqueue is
rejected with a TypeError instead of silently leaking duplicates.
Configuration errors that are decidable without a payload (a native
queue lacking enqueueMany, or a closed fallback without cas) are
checked before payloads are validated and encoded, so they reject
before any user schema runs or any key is reserved.
fedify-dev#798
Assisted-by: Claude Code:claude-opus-4-8
Layer task-specific telemetry onto the custom background task dispatch path, reusing the queue-task metric pattern and mirroring the existing `http_signatures.failure_reason` enum in metrics.ts. Each dequeued task now runs in a `fedify.task` span that inherits the enqueue site's trace context and carries `fedify.task.name`, `fedify.task.attempt`, and, on a terminal failure, `fedify.task.failure_reason`. The `fedify.queue.task.*` metrics report task runs under the new `"task"` role with the task name and, on failure, a bounded `fedify.task.failure_reason`. To tell the failure reasons apart, `#listenTaskMessage` splits the former `decode()` call into its deserialize and validate phases and returns the decision point that failed: `deserialization`, `validation`, `unknown_task`, or `handler`. A swallowed abort is reported as a graceful interruption, not a failure. The reported `fedify.queue.backend` reflects the resolved queue so it stays accurate under the outbox fallback. Public surface: `QueueTaskRole` gains `"task"`, `QueueTaskCommonAttributes` gains `taskName`, and a new `QueueTaskFailureReason` type plus an optional trailing `failureReason` parameter on `recordQueueTaskOutcome()` carry the reason. `TaskCodec` exposes an instance `validate()` wrapper so the dispatch site can split decoding without importing the class. fedify-dev#799 Assisted-by: Claude Code:claude-opus-4-8
Deno executes the TypeScript sources directly, so `test:deno` spent most of its time on a `build` it never needed: with no dist/ output present, the whole Deno suite passes except the npm packaging regression tests added for fedify-dev#655, which assert that the built package.json entry points of `@fedify/cli`, `@fedify/create`, and `@fedify/init` exist. Those checks guard the npm artifacts, not the Deno runtime, and still run under `test:node` and `test:bun`, which build first—so skip them under Deno and drop the `build` dependency from `test:deno`. `@fedify/lint`'s oxlint integration test already skips itself when *dist/oxlint.js* is absent. Update AGENTS.md to match: document `mise run build`/`prepare-each` for building, `check-each` and `test-each` for scoping work to specific packages, recommend the now build-free `test:deno` as the default test loop during development, and add a section directing agents to consult `mise tasks`. Assisted-by: Claude Code:claude-opus-4-8 Assisted-by: Claude Code:claude-fable-5
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a2788986f1
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| ); | ||
| } | ||
| // A swallowed abort is a graceful interruption, not a task failure. | ||
| return isAbortError(error) ? undefined : "handler"; |
There was a problem hiding this comment.
Do not mark retried task attempts as failed
When a handler throws on a non-native queue and the retry policy returns a delay, the branch above successfully re-enqueues the retry but this return value still reports "handler" to the wrapper. That wrapper then records fedify.queue.task.failed and sets the span to ERROR for every transient task error that is being retried, even though the attempt was folded into a retry (matching the existing inbox/outbox internal-retry convention and the docs' “terminal failure” wording). This will inflate failed-task alerts for workloads with normal retries; only the give-up path should return the handler failure reason.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/fedify/src/federation/tasks/enqueue.ts (1)
98-113: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick winEnqueue metrics can undercount partial fan-out successes.
Whendispatch()falls back toPromise.all(queue.enqueue(...)), one rejected enqueue aborts the whole batch beforerecordQueueTaskEnqueued()runs, so messages that already reached the backend never get counted. Consider recording per-message success in the fan-out path or switching that branch toPromise.allSettled().🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/fedify/src/federation/tasks/enqueue.ts` around lines 98 - 113, The enqueue metrics in enqueue() can miss partially successful fan-out enqueues because Promise.all aborts before recordQueueTaskEnqueued() runs. Update the dispatch flow so each message’s successful enqueue is recorded individually in the Promise.all(queue.enqueue(...)) path, or change that branch to Promise.allSettled() and count only the fulfilled enqueues while preserving the existing rollback/error handling.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@CHANGES.md`:
- Line 48: The changelog entry in CHANGES.md has malformed issue references
because two adjacent links are missing a separator, causing them to render
together. Update the existing contributor entry that contains the references
near `#799` and `#803` so it keeps the same [[`#123`] by Name] style while adding the
missing comma separator between those two references, and leave the rest of the
reference formatting unchanged.
In `@packages/fedify/src/federation/tasks/tasks.test.ts`:
- Around line 1524-1552: The retry-path test in tasks.test.ts only checks the
re-enqueue metric and misses the paired failure metric. Update the test around
processQueuedTask to also assert that `#listenTaskMessage` emits
fedify.queue.task.failed with fedify.task.failure_reason set to "handler" when
the handler throws and retryPolicy schedules a retry. Use the existing recorder
assertions alongside fedify.queue.task.enqueued so the test covers both
measurements for the retry flow.
---
Outside diff comments:
In `@packages/fedify/src/federation/tasks/enqueue.ts`:
- Around line 98-113: The enqueue metrics in enqueue() can miss partially
successful fan-out enqueues because Promise.all aborts before
recordQueueTaskEnqueued() runs. Update the dispatch flow so each message’s
successful enqueue is recorded individually in the
Promise.all(queue.enqueue(...)) path, or change that branch to
Promise.allSettled() and count only the fulfilled enqueues while preserving the
existing rollback/error handling.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 2c06364a-9cde-4035-a0db-fa9658e132f4
📒 Files selected for processing (13)
AGENTS.mdCHANGES.mddocs/manual/opentelemetry.mddocs/manual/tasks.mdmise.tomlpackages/cli/src/startup.test.tspackages/create/src/package.test.tspackages/fedify/src/federation/metrics.tspackages/fedify/src/federation/middleware.tspackages/fedify/src/federation/tasks/codec.tspackages/fedify/src/federation/tasks/enqueue.tspackages/fedify/src/federation/tasks/tasks.test.tspackages/init/src/package.test.ts
| `FederationOptions.taskDeduplicationFallback` options. | ||
|
|
||
| [[#206], [#797], [#798], [#803], [#806] by ChanHaeng Lee] | ||
| [[#206], [#797], [#798], [#799] [#803], [#806], [#812] by ChanHaeng Lee] |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win
Fix the malformed changelog references.
[#799] [#803] is missing a separator, so the rendered changelog will merge those references. Please change it to [#799], [#803] and keep the existing [[#123] by Name] style intact.
As per coding guidelines, CHANGES.md external-contributor entries should use [[#123] by Name] reference formatting.
Suggested fix
- [[`#206`], [`#797`], [`#798`], [`#799`] [`#803`], [`#806`], [`#812`] by ChanHaeng Lee]
+ [[`#206`], [`#797`], [`#798`], [`#799`], [`#803`], [`#806`], [`#812`] by ChanHaeng Lee]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| [[#206], [#797], [#798], [#799] [#803], [#806], [#812] by ChanHaeng Lee] | |
| [[`#206`], [`#797`], [`#798`], [`#799`], [`#803`], [`#806`], [`#812`] by ChanHaeng Lee] |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@CHANGES.md` at line 48, The changelog entry in CHANGES.md has malformed issue
references because two adjacent links are missing a separator, causing them to
render together. Update the existing contributor entry that contains the
references near `#799` and `#803` so it keeps the same [[`#123`] by Name] style while
adding the missing comma separator between those two references, and leave the
rest of the reference formatting unchanged.
Source: Coding guidelines
| await t.step( | ||
| "records the retry re-enqueue with role task and a bumped attempt", | ||
| async () => { | ||
| const queue = new MockQueue(); | ||
| const { federation, recorder } = instrument({ | ||
| ...baseOptions, | ||
| queue: { task: queue }, | ||
| }); | ||
| federation.defineTask("retry-me", { | ||
| schema: stringSchema, | ||
| handler: () => { | ||
| throw new Error("boom"); | ||
| }, | ||
| retryPolicy: () => Temporal.Duration.from({ milliseconds: 1 }), | ||
| }); | ||
| await federation.processQueuedTask( | ||
| undefined, | ||
| await makeTaskMessage("retry-me", "payload"), | ||
| ); | ||
|
|
||
| strictEqual(queue.enqueued.length, 1); | ||
| strictEqual(queue.enqueued[0].message.attempt, 1); | ||
| const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); | ||
| strictEqual(enqueued.length, 1); | ||
| strictEqual(enqueued[0].attributes["fedify.queue.role"], "task"); | ||
| strictEqual(enqueued[0].attributes["fedify.task.name"], "retry-me"); | ||
| strictEqual(enqueued[0].attributes["fedify.queue.task.attempt"], 1); | ||
| }, | ||
| ); |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win
Consider asserting fedify.queue.task.failed on the retry path too.
This test confirms the enqueued re-enqueue metric but doesn't assert that the same attempt also records fedify.queue.task.failed with fedify.task.failure_reason: "handler". Per #listenTaskMessage (middleware.ts), a scheduled retry and a terminal give-up both return "handler", so both should emit a failed measurement — this is the more surprising half of the task-vs-inbox/outbox distinction called out in docs/manual/opentelemetry.md. Adding this assertion would close a coverage gap on that documented behavior.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/fedify/src/federation/tasks/tasks.test.ts` around lines 1524 - 1552,
The retry-path test in tasks.test.ts only checks the re-enqueue metric and
misses the paired failure metric. Update the test around processQueuedTask to
also assert that `#listenTaskMessage` emits fedify.queue.task.failed with
fedify.task.failure_reason set to "handler" when the handler throws and
retryPolicy schedules a retry. Use the existing recorder assertions alongside
fedify.queue.task.enqueued so the test covers both measurements for the retry
flow.
Resolves #799, the third and final sub-issue of #206 (custom background tasks). Once this lands, #206 is fully resolved.
Background
The core task API (#797/#803) shipped task dispatch behavior and structured logging, but the task worker carries no span and no metrics: of the message variants handled in
processQueuedTask, every other branch (fanout/outbox/inbox) is dispatched with instrumentation, buttask.This PR closes that gap by layering task-specific telemetry onto the decision points the core already established. It reuses the queue-task metric pattern introduced in #759 and mirrors the existing
http_signatures.failure_reasonenum in metrics.ts. It changes no drop/retry behavior: telemetry is observed, never enforced.What changes
Span
Each dequeued task now runs inside a
fedify.taskconsumer span. The name is namespaced underfedify.rather thanactivitypub.because tasks are not part of ActivityPub, paralleling the existingactivitypub.inbox/outbox/fanoutspans. The span:fedify.task.nameandfedify.task.attempt(the zero-based attempt number).fedify.task.failure_reasonand sets its status toERRORon a terminal failure, so trace backends surface failed tasks without re-deriving the reason from logs.Failure attribution
#listenTaskMessagenow returns the failure reason (orundefinedon success) so the span/metric wrapper can attribute it. To distinguish a deserialization failure from a validation failure, the former combinedcodec.decode(...)call is split into its existingdeserializethenvalidatephases. This is behavior-preserving—decodeis literallyvalidate(schema, await deserialize(raw))—andTaskCodecgains a thin instancevalidate()wrapper so the dispatch site can split the two phases without importing the class.The four bounded
fedify.task.failure_reasonvalues map one-to-one to the worker's dispatch decision points:deserialization— the wire payload could not be deserialized.validation— the deserialized payload failed schema validation.unknown_task— the task name has no registered handler.handler— the registered handler threw.A worker shutdown is the one exception: an interrupted attempt is reported as an
abortedoutcome with nofedify.task.failure_reason, never as ahandlerfailure.Metrics surface
Tasks reuse the
fedify.queue.task.*metric family under a newtaskrole:QueueTaskRolegains"task".QueueTaskCommonAttributesgainstaskName, emitted asfedify.task.name.QueueTaskFailureReasontype, mirroringHttpSignatureMetricFailureReason.recordQueueTaskOutcome()gains an optional trailingfailureReasonparameter (non-breaking); it is emitted asfedify.task.failure_reasononly on afailedresult.recordQueueTaskEnqueuedrecordsrole: "task"at both the enqueue site (after a genuine dispatch, never on a dedup skip or a failed enqueue) and the retry re-enqueue site.fedify.queue.backendreports the resolved queue—the one actually used after routing, which may be the outbox queue under the fallback mode—so the metric stays accurate regardless of routing.Cardinality
Bounded by construction: task names are a registered, known-at-startup set (never derived from message content), and
failure_reasonis a four-value bounded enum. Combined cardinality istaskName × |failure_reason| × queue.backend, within OTel attribute safety. The process-localin_flightUpDownCounter omitsfedify.task.nameso its series stays drained.Out of scope
taskName(would risk unbounded cardinality).QueueTaskFailureReasonset—explicitly open to later refinement as long as it stays a small bounded set.Tests
packages/fedify/src/federation/tasks/tasks.test.ts gains a telemetry block with one assertion per acceptance criterion, using
TestSpanExporter/createTestTracerProvider/createTestMeterProviderfrom@fedify/fixture. Coverage:fedify.taskspan exists withfedify.task.nameandfedify.task.attempt.fedify.task.failure_reason.fedify.queue.backendreflects the resolved queue, including the outbox fallback.recordQueueTaskEnqueued/recordQueueTaskOutcomecarryrole: "task".Verified across Deno, Node.js, and Bun.
Documentation
fedify.taskspan row, thetaskvalue added to thefedify.queue.roleenumeration, a widenedfailed-result definition covering acked task drops, and thefedify.task.name/fedify.task.attempt/fedify.task.failure_reasonattribute rows.AI disclosure
Assisted-by: Claude Code:claude-opus-4-8