Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,29 @@ Development workflow
the editor/LSP after editing YAML.
- **Building Packages**: All packages are built automatically as part of
setup. Run `mise run build` to rebuild everything, or
`mise run prepare-each <pkg>` to rebuild just one (without the `@fedify/`
prefix).
- **Checking Code**: Run `mise run check` before committing.
- **Running Tests**: Use `mise run test:deno` for Deno tests or
`mise run test` for all environments.
`mise run prepare-each <pkgs>` to rebuild specific packages (without the
`@fedify/` prefix).
- **Checking Code**: Run `mise run check` before committing, or run
`mise run check-each <pkgs>` to check specific packages. If any issues from
`check:fmt`, `check:lint` or `check:md`, are found, refers
**Formatting and Linting** section.
- **Formatting and Linting**: Run `mise run fmt` to format all code and docs.
- **Running Tests**:
While testing is certainly important, blindly running every test suite every
time is inefficient. Since Deno executes TS source code directly, it doesn't
waste resources on builds. Therefore, during development, run
`mise run test:deno {TEST_PATH} --filter <TEST_TITLE>` for most tests that
are independent of the runtime. If the test is dependent on a specific
runtime other than Deno, replace `test:deno` with `test:node` or `test:bun`.
Once development is complete, run `mise run test-each <pkgs>` to test the
modified packages (without the `@fedify/` prefix).
Finally, when ready for deployment, run `mise run test` to execute the
whole codebase-wide tests.
- `mise run test`: Executes all the tests in every runtime.
- `mise run test:<runtime:deno,node,bun>`:
Executes all the tests by the runtime.
- `mise run test-each <pkgs>`: Executes tests in packages that include
`pkgs` in every runtime (without the `@fedify/` prefix).

For detailed contribution guidelines, see *CONTRIBUTING.md*.

Expand Down
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ Version 2.4.0
`FederationOptions.taskDeduplicationTtl` and
`FederationOptions.taskDeduplicationFallback` options.

[[#206], [#797], [#798], [#803], [#806] by ChanHaeng Lee]
[[#206], [#797], [#798], [#799], [#803], [#806], [#812] by ChanHaeng Lee]

[Standard Schema]: https://standardschema.dev/
[#206]: https://github.com/fedify-dev/fedify/issues/206
[#797]: https://github.com/fedify-dev/fedify/issues/797
[#798]: https://github.com/fedify-dev/fedify/issues/798
[#799]: https://github.com/fedify-dev/fedify/issues/799
[#803]: https://github.com/fedify-dev/fedify/pull/803
[#806]: https://github.com/fedify-dev/fedify/pull/806
[#812]: https://github.com/fedify-dev/fedify/pull/812


Version 2.3.1
Expand Down
204 changes: 112 additions & 92 deletions docs/manual/opentelemetry.md

Large diffs are not rendered by default.

77 changes: 74 additions & 3 deletions docs/manual/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,82 @@ collapsed onto one message.
> owns an atomic check.


Observability
-------------

*Task-specific telemetry is available since Fedify 2.4.0.*

Each task the worker dequeues runs inside a `fedify.task` [OpenTelemetry] span
(a *consumer* span, since tasks are not part of ActivityPub it is namespaced
under `fedify.` rather than `activitypub.`). The span inherits the trace
context captured at the enqueue site, so a task's processing chains to the
request or job that enqueued it—and every retry attempt chains to the same
parent. The span carries:

- `fedify.task.name` — the registered task name.
- `fedify.task.attempt` — the zero-based attempt number; a retry re-enqueue
increments it.
- `fedify.task.failure_reason` — set only on a terminal failure, one of the
four bounded values below.

On a terminal failure the span's status is also set to `ERROR`, so trace-based
error views surface dropped and given-up tasks together with their
`fedify.task.failure_reason`. A worker shutdown is the one exception: an
`aborted` attempt leaves the status unset, since an interruption is not a task
failure.

Tasks also reuse the `fedify.queue.task.*` metric family (`enqueued`,
`started`, `completed`, `failed`, `duration`, `in_flight`) that the inbox,
outbox, and fanout workers already report. On a task run measurement
(`enqueued`, `started`, `completed`, `failed`, `duration`),
`fedify.queue.role` is `task` and `fedify.task.name` names the task; the
process-local `in_flight` UpDownCounter omits `fedify.task.name` so its
increments and decrements pair up cleanly.
`fedify.queue.backend` reflects the queue actually used after routing—so a task
that falls back to the `outboxQueue` (see
[Routing](#queue-routing-and-isolation)) is labeled with the outbox queue's
backend, not a task queue's. A failed outcome
also carries `fedify.task.failure_reason` on `fedify.queue.task.failed` and
`fedify.queue.task.duration`.

The `fedify.task.failure_reason` attribute takes one of four bounded values,
mapping to the worker's dispatch decision points:

| Value | Meaning |
| ----------------- | -------------------------------------------------- |
| `deserialization` | The wire payload could not be deserialized. |
| `validation` | The deserialized payload failed schema validation. |
| `unknown_task` | The task name has no registered handler. |
| `handler` | The registered handler threw. |

The first three are *drops*: the payload cannot succeed by retrying, so the
worker acknowledges the message and does not re-enqueue it. Telemetry still
records these as a failed outcome with the matching reason, while the queue is
left drained—so a drop is observable without being retried. A `handler`
failure follows the configured retry policy (see
[Retries](#retry-and-error-handling)): an attempt folded into a scheduled
retry records a `completed` outcome, and only the terminal give-up records
`failed` with the `handler` reason. A worker shutdown is never counted as a
failure: an interrupted attempt carries no `fedify.task.failure_reason`—it is
recorded as an `aborted` outcome when the abort propagates (on a
`nativeRetrial` queue) or when the retry policy declines another attempt, and
otherwise folded into a scheduled retry like any handler error.

The bounded value set keeps metric cardinality finite: a metric's task name is
a registered, known-at-startup value, never derived from message content—an
`unknown_task` drop carries a wire-supplied name, so that name is kept off the
metrics (it still appears on the span, which does not aggregate into time
series). See the [OpenTelemetry](./opentelemetry.md) manual for the full span,
attribute, and metric reference.

[OpenTelemetry]: https://opentelemetry.io/


Limitations
-----------

The current API intentionally ships without task-specific OpenTelemetry spans
and metrics, cron-style periodic scheduling, result backends, and per-task
priority. Some of these are planned as follow-ups; see the [tracking issue].
The current API intentionally ships without cron-style periodic scheduling,
result backends, and per-task priority. Some of these are planned as
follow-ups; see the [tracking issue].

[tracking issue]: https://github.com/fedify-dev/fedify/issues/206
2 changes: 1 addition & 1 deletion mise.toml
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,9 @@ for pkg in ($env.usage_packages | split row " " | where ($it | is-not-empty)) {
'''

# Testing

[tasks."test:deno"]
description = "Run the test suite using Deno"
depends = ["build"]
run = "deno test --check --doc --allow-all --unstable-kv --trace-leaks --parallel"

[tasks."test:node"]
Expand Down
5 changes: 4 additions & 1 deletion packages/cli/src/startup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import test from "node:test";
import { fileURLToPath } from "node:url";

const packageDir = resolve(dirname(fileURLToPath(import.meta.url)), "..");
test("CLI build keeps the init command bridge", async () => {

test("CLI build keeps the init command bridge", {
skip: "Deno" in globalThis,
}, async () => {
const entrypoint = resolve(packageDir, "dist/mod.js");
const commandBridge = resolve(packageDir, "dist/commands.js");
await access(entrypoint);
Expand Down
1 change: 1 addition & 0 deletions packages/create/src/package.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const packageDir = resolve(dirname(fileURLToPath(import.meta.url)), "..");

test(
"package.json entrypoints match built create CLI",
{ skip: "Deno" in globalThis },
async () => {
const packageJson = JSON.parse(
await readFile(resolve(packageDir, "package.json"), "utf8"),
Expand Down
36 changes: 34 additions & 2 deletions packages/fedify/src/federation/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import type { MessageQueue } from "./mq.ts";
* The role of a queued task, derived from the queued message's `type` field.
* @since 2.3.0
*/
export type QueueTaskRole = "fanout" | "outbox" | "inbox";
export type QueueTaskRole = "fanout" | "outbox" | "inbox" | "task";

/**
* The terminal result of a queued task processing attempt.
Expand Down Expand Up @@ -91,6 +91,13 @@ export interface QueueTaskCommonAttributes {
role: QueueTaskRole;
queue?: MessageQueue;
activityType?: string;

/**
* The registered name of a custom background task, emitted as the
* `fedify.task.name` attribute. Set only for the `"task"` role.
* @since 2.4.0
*/
taskName?: string;
}

/**
Expand Down Expand Up @@ -209,6 +216,23 @@ export type HttpSignatureMetricFailureReason =
| "invalidSignature"
| "keyFetchError";

/**
* The reason a custom background task terminated unsuccessfully, emitted as the
* `fedify.task.failure_reason` attribute. A small bounded set mapping to the
* worker's dispatch decision points; open to later refinement.
*
* - `deserialization`: the wire payload could not be deserialized.
* - `validation`: the deserialized payload failed schema validation.
* - `unknown_task`: the task name has no registered handler.
* - `handler`: the registered handler threw.
* @since 2.4.0
*/
export type QueueTaskFailureReason =
| "deserialization"
| "validation"
| "unknown_task"
| "handler";

/**
* Bounded values recorded as `ld_signatures.type` on the signature
* verification duration histogram. Fedify only signs and verifies
Expand Down Expand Up @@ -987,10 +1011,11 @@ class FederationMetrics {
recordQueueTaskEnqueued(
common: QueueTaskCommonAttributes,
attempt: number,
count = 1,
): void {
const attributes = buildQueueTaskAttributes(common);
attributes["fedify.queue.task.attempt"] = attempt;
this.queueTaskEnqueued.add(1, attributes);
this.queueTaskEnqueued.add(count, attributes);
}

recordQueueTaskStarted(common: QueueTaskCommonAttributes): void {
Expand All @@ -1009,9 +1034,13 @@ class FederationMetrics {
common: QueueTaskCommonAttributes,
result: QueueTaskResult,
durationMs: number,
failureReason?: QueueTaskFailureReason,
): void {
const attributes = buildQueueTaskAttributes(common);
attributes["fedify.queue.task.result"] = result;
if (failureReason != null && result === "failed") {
attributes["fedify.task.failure_reason"] = failureReason;
}
if (result === "completed") {
this.queueTaskCompleted.add(1, attributes);
} else if (result === "failed") {
Expand Down Expand Up @@ -1197,6 +1226,9 @@ function buildQueueTaskAttributes(
if (common.activityType != null) {
attributes["activitypub.activity.type"] = common.activityType;
}
if (common.taskName != null) {
attributes["fedify.task.name"] = common.taskName;
}
return attributes;
}

Expand Down
7 changes: 5 additions & 2 deletions packages/fedify/src/federation/middleware.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10506,8 +10506,11 @@ test("createFederation() omits instrumentation when no meterProvider is set", ()
});

const taskCodec = new TaskCodec({ contextLoader: mockDocumentLoader });
const decodeEnvelope = (message: TaskMessage): Promise<Envelope> =>
taskCodec.decode(envelopeSchema, message.data);
const decodeEnvelope = async (message: TaskMessage): Promise<Envelope> => {
const decoded = await taskCodec.decode(envelopeSchema, message.data);
if (!decoded.ok) throw decoded.error;
return decoded.value;
};
const envelope = (title: string): Envelope => ({
note: new Note({ content: title }),
title,
Expand Down
Loading
Loading