Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,25 @@ Version 2.4.0
`FederationOptions.taskQueueResolution` option is set to `"strict"`.
`Federation.startQueue()` now accepts `queue: "task"` to run
a task-only worker.

[[#206], [#797], [#803] by ChanHaeng Lee]
- Tasks can request at-most-once enqueue with a `deduplicationKey`
(new `TaskEnqueueOptions.deduplicationKey`). A queue declaring the new
`MessageQueue.nativeDeduplication` capability owns the check and
receives the key through the new
`MessageQueueEnqueueOptions.deduplicationKey`; otherwise Fedify
performs a best-effort key–value guard through the optional
`KvStore.cas` primitive, under a new `taskDeduplication` key prefix.
The marker TTL and the no-`cas` fallback are tunable with the new
`FederationOptions.taskDeduplicationTtl` and
`FederationOptions.taskDeduplicationFallback` options.

[[#206], [#797], [#798], [#803], [#806] by ChanHaeng Lee]

[Standard Schema]: https://standardschema.dev/
[#206]: https://github.com/fedify-dev/fedify/issues/206
[#797]: https://github.com/fedify-dev/fedify/issues/797
[#798]: https://github.com/fedify-dev/fedify/issues/798
[#803]: https://github.com/fedify-dev/fedify/pull/803
[#806]: https://github.com/fedify-dev/fedify/pull/806


Version 2.3.0
Expand Down
101 changes: 96 additions & 5 deletions docs/manual/tasks.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
<!-- deno-fmt-ignore-file -->

Background tasks
================

*This API is available since Fedify 2.x.x.*
*This API is available since Fedify 2.4.0.*

Fedify already processes outgoing and incoming activities on background
workers through its [message queue](./mq.md). The custom background task API
Expand Down Expand Up @@ -139,6 +141,10 @@ Both methods accept options:
: Tasks with the same ordering key are processed sequentially (one at
a time), like the same option on the message queue layer.

`deduplicationKey`
: Requests at-most-once enqueue for tasks that share the key; see
[Deduplication](#deduplication) below.

~~~~ typescript
await ctx.enqueueTask(sendDigest, payload, {
delay: { minutes: 30 },
Expand Down Expand Up @@ -257,12 +263,97 @@ delivered it.
> queue and set `taskQueueResolution: "strict"`.


Deduplication
-------------

A task often needs *at-most-once-per-key* enqueue: a digest mailer must not
send twice when a request is retried, and a cleanup job should coalesce
duplicate triggers. Passing a `deduplicationKey` requests this—while the
first enqueue is still within the deduplication window, a second enqueue
with the same key is dropped. Whether that drop actually happens depends
on the queue and key–value store, as the fallback rules below decide:

~~~~ typescript
await ctx.enqueueTask(sendDigest, payload, {
deduplicationKey: `digest:${payload.userId}`, // [!code highlight]
});
~~~~

How the key is resolved depends on the queue and the key–value store:

1. *Native backend.* When the task's queue declares
`~MessageQueue.nativeDeduplication`, Fedify forwards the key in the
message queue's `~MessageQueueEnqueueOptions.deduplicationKey` and the
backend owns the check. Fedify does not touch the key–value store.

2. *Key–value fallback.* Otherwise, if the configured `KvStore` exposes
the optional compare-and-swap (`~KvStore.cas`) primitive, Fedify records
the key under a dedicated `taskDeduplication` prefix with a TTL and skips
the enqueue while a marker is present. The TTL defaults to one hour and is
configurable with `~FederationOptions.taskDeduplicationTtl`:

~~~~ typescript
const federation = createFederation<void>({
// ...
taskDeduplicationTtl: { minutes: 10 }, // [!code highlight]
});
~~~~

3. *No conditional write.* When neither applies—no native deduplication and
a key–value store without `~KvStore.cas`—the behavior is governed by
`~FederationOptions.taskDeduplicationFallback`. `"open"` (the default)
lets the enqueue proceed without deduplication after a debug-level log;
`"closed"` throws a `TypeError` before enqueuing:

~~~~ typescript
const federation = createFederation<void>({
// ...
taskDeduplicationFallback: "closed", // [!code highlight]
});
~~~~

Among the first-party adapters, the in-memory, Deno KV, SQLite, and MySQL
key–value stores implement `~KvStore.cas`; PostgreSQL, Redis, and
Cloudflare Workers KV do not yet, so those deployments take the
`taskDeduplicationFallback` branch until per-adapter follow-ups add it.

For `~Context.enqueueTaskMany()`, a single `deduplicationKey` applies to the
whole batch: the batch enqueues as a unit or is skipped as a unit, never
partially. Per-item deduplication means calling `~Context.enqueueTask()` in
a loop, each with its own key. Deduplicating a multi-item batch requires the
queue to implement `~MessageQueue.enqueueMany()` so the batch enqueues
atomically—whether the check is native or the key–value fallback. Fanning the
key out across separate `~MessageQueue.enqueue()` calls cannot enqueue a whole
batch as one unit: a native per-message key cannot cover it, and a key–value
marker could not be rolled back cleanly if only some of the fanned-out enqueues
failed. So when deduplication is actually applied—a native queue, or a
key–value store with `~KvStore.cas`—Fedify rejects a multi-item batch with a
`deduplicationKey` on a queue without `~MessageQueue.enqueueMany()` instead of
risking duplicates. Under the `"open"` fallback (no native deduplication and no
`cas`), no marker is taken, so the batch simply fans out without deduplication.

This applies through `ParallelMessageQueue` as well: wrapping a queue that
lacks `~MessageQueue.enqueueMany()` does not make batch enqueue atomic, so a
deduplicated multi-item batch on such a wrapper is likewise rejected rather than
collapsed onto one message.

> [!WARNING]
> The key–value fallback is *best-effort, not transactional*. The marker
> write and the enqueue are separate operations. Fedify rolls the marker back
> when an enqueue fails, so a transient failure does not suppress the retry, but
> a crash before that rollback, the `"open"` fallback under concurrency, a
> non-atomic third-party `~KvStore.cas`, or reuse of a key within its TTL window
> can still admit a duplicate or suppress a task. Cleanup is otherwise by TTL
> expiry, not active deletion on handler success. Deployments needing strict
> guarantees use a queue with `nativeDeduplication: true`, where the backend
> owns an atomic check.


Limitations
-----------

The current API intentionally ships without deduplication, task-specific
OpenTelemetry spans and metrics, cron-style periodic scheduling, result
backends, and per-task priority. Some of these are planned as follow-ups;
see the [tracking issue].
The current API intentionally ships without task-specific OpenTelemetry spans
and metrics, cron-style periodic scheduling, result backends, and per-task
priority. Some of these are planned as follow-ups; see the [tracking issue].

[tracking issue]: https://github.com/fedify-dev/fedify/issues/206
16 changes: 10 additions & 6 deletions packages/fedify/src/federation/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ export interface Context<TContextData> {
* @throws {TypeError} If the task is not defined on this federation,
* if no message queue is configured for tasks, or if
* the payload fails schema validation.
* @since 2.x.x
* @since 2.4.0
*/
enqueueTask<TData>(
task: TaskDefinition<TContextData, TData>,
Expand All @@ -459,18 +459,22 @@ export interface Context<TContextData> {

/**
* Enqueues multiple payloads for a custom background task at once.
* Uses the queue's bulk enqueue operation when available, falling back
* to parallel single enqueues.
* Uses the queue's bulk enqueue operation when available. Without
* deduplication, it may fall back to parallel single enqueues when the
* queue does not implement bulk enqueue.
* @template TData The type of the task payload, inferred from the task's
* schema.
* @param task The handle returned by {@link TaskRegistry.defineTask}.
* @param payloads The task payloads. Each is validated against the
* task's schema before being enqueued.
* @param options Options for enqueuing the tasks.
* @throws {TypeError} If the task is not defined on this federation,
* if no message queue is configured for tasks, or if
* a payload fails schema validation.
* @since 2.x.x
* if no message queue is configured for tasks, if
* a payload fails schema validation, or if a
* deduplicated multi-item batch cannot be enqueued
* atomically because the queue does not implement
* bulk enqueue.
* @since 2.4.0
*/
enqueueTaskMany<TData>(
task: TaskDefinition<TContextData, TData>,
Expand Down
32 changes: 29 additions & 3 deletions packages/fedify/src/federation/federation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import type {
import type { MeterProvider, TracerProvider } from "@opentelemetry/api";
import type { ActivityTransformer } from "../compat/types.ts";
import type { HttpMessageSignaturesSpec } from "../sig/http.ts";
import type { CircuitBreakerOptions } from "./circuit-breaker.ts";
import type {
ActorAliasMapper,
ActorDispatcher,
Expand All @@ -39,6 +38,7 @@ import type {
UnverifiedActivityHandler,
WebFingerLinksDispatcher,
} from "./callback.ts";
import type { CircuitBreakerOptions } from "./circuit-breaker.ts";
import type { Context, InboxContext, RequestContext } from "./context.ts";
import type { KvStore } from "./kv.ts";
import type {
Expand Down Expand Up @@ -1086,7 +1086,7 @@ export interface FederationOptions<TContextData> {
* this uses an exponential backoff strategy with a maximum of 10 attempts
* and a maximum delay of 12 hours. A per-task retry policy
* ({@link TaskDefinitionOptions.retryPolicy}) overrides this.
* @since 2.x.x
* @since 2.4.0
*/
taskRetryPolicy?: RetryPolicy;

Expand All @@ -1099,10 +1099,36 @@ export interface FederationOptions<TContextData> {
* - `"strict"`: no fallback; enqueuing the task throws instead of
* silently sharing the outbox queue.
* @default `"fallback"`
* @since 2.x.x
* @since 2.4.0
*/
taskQueueResolution?: "fallback" | "strict";

/**
* The time-to-live for a {@link TaskEnqueueOptions.deduplicationKey} marker
* stored in the key–value deduplication fallback. A second enqueue with the
* same key within this window is skipped; once it expires, the key may
* enqueue again. Ignored when the task's queue declares
* {@link MessageQueue.nativeDeduplication} (the backend owns the window).
* @default `{ hours: 1 }`
* @since 2.4.0
*/
taskDeduplicationTtl?: Temporal.DurationLike;

/**
* The behavior when a {@link TaskEnqueueOptions.deduplicationKey} is supplied
* but the task's queue does not declare
* {@link MessageQueue.nativeDeduplication} *and* the configured
* {@link KvStore} exposes no `cas` (compare-and-swap) primitive:
*
* - `"open"` (the default): proceeds without deduplication after logging at
* debug level.
* - `"closed"`: rejects with a `TypeError` before enqueuing.
*
* @default `"open"`
* @since 2.4.0
*/
taskDeduplicationFallback?: "open" | "closed";

/**
* Activity transformers that are applied to outgoing activities. It is
* useful for adjusting outgoing activities to satisfy some ActivityPub
Expand Down
17 changes: 10 additions & 7 deletions packages/fedify/src/federation/middleware.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10565,20 +10565,23 @@ const withTimeout = <T>(
return Promise.race([promise, timeout]).finally(() => clearTimeout(timer));
};

const taskFederationOptions = {
// A factory, not a shared constant: each task test gets its own
// MemoryKvStore so deduplication markers never leak across tests and the
// suite stays order-independent as more cases are added.
const mockOptions = () => ({
kv: new MemoryKvStore(),
documentLoaderFactory: () => mockDocumentLoader,
contextLoaderFactory: () => mockDocumentLoader,
manuallyStartQueue: true,
};
});

test("ContextImpl.enqueueTask()", async (t) => {
await t.step(
"builds the task message envelope and round-trips a vocab payload",
async () => {
const queue = new MockQueue({ supportsEnqueueMany: true });
const federation = createFederation<void>({
...taskFederationOptions,
...mockOptions(),
queue: { task: queue },
});
const task = federation.defineTask("greet", {
Expand Down Expand Up @@ -10619,7 +10622,7 @@ test("ContextImpl.enqueueTaskMany()", async (t) => {
async () => {
const queue = new MockQueue({ supportsEnqueueMany: true });
const federation = createFederation<void>({
...taskFederationOptions,
...mockOptions(),
queue: { task: queue },
});
const task = federation.defineTask("bulk", {
Expand Down Expand Up @@ -10652,7 +10655,7 @@ test("ContextImpl.enqueueTaskMany()", async (t) => {
async () => {
const queue = new MockQueue({ supportsEnqueueMany: true });
const federation = createFederation<void>({
...taskFederationOptions,
...mockOptions(),
queue: { task: queue },
});
const task = federation.defineTask("bulk-single", {
Expand All @@ -10674,7 +10677,7 @@ test("ContextImpl.enqueueTaskMany()", async (t) => {
async () => {
const queue = new RendezvousQueue(2);
const federation = createFederation<void>({
...taskFederationOptions,
...mockOptions(),
queue: { task: queue },
});
const task = federation.defineTask("bulk-fallback", {
Expand Down Expand Up @@ -10716,7 +10719,7 @@ test("ContextImpl.enqueueTaskMany()", async (t) => {
async () => {
const queue = new MockQueue();
const federation = createFederation<void>({
...taskFederationOptions,
...mockOptions(),
queue: { task: queue },
});
const task = federation.defineTask("bulk-typed", {
Expand Down
Loading
Loading