feat(retention): export/clean/rehydrate endpoints for task content#243
Conversation
Adds an operational surface for bounded retention of task chat content
in shared infrastructure. Callers can snapshot a task's content,
delete it from the shared stores, and later restore it byte-identically
from the snapshot — preserving message IDs and timestamps so tool-call
and reasoning references remain valid.
Three new endpoints under /tasks/{task_id}:
- GET /export — returns a self-contained snapshot (messages + task_states)
- POST /clean — deletes content across Mongo messages, Mongo task_states,
Postgres events; resets agent_task_tracker cursors; sets tasks.cleaned_at
- POST /rehydrate — restores content from a snapshot, clears cleaned_at
Domain layer lives in TaskRetentionService so the eventual scheduled
sweep workflow and the HTTP endpoints share the same code path.
Cleanup uses a "Mongo deletes first, Postgres marker last" order so
retries after partial failure converge correctly. The active-task,
idle-threshold, and unprocessed-events guards refuse cleanup when the
task isn't safe to drop.
Schema:
- New nullable tasks.cleaned_at column (TIMESTAMPTZ, metadata-only ALTER)
- No new audit table — cleanup operations emit structured log lines
Other changes:
- adapter_mongodb.batch_create now translates pymongo BulkWriteError
with all-duplicate-key sub-errors into DuplicateItemError (HTTP 400)
instead of letting it surface as ServiceError (HTTP 500)
- New EventRepository.delete_by_task_id and
AgentTaskTrackerRepository.reset_cursors_for_task methods
Tests: 13 integration tests covering happy paths, all precondition
guards, and the byte-identical export → clean → rehydrate round-trip.
✱ Stainless preview buildsThis PR will update the openapi python typescript
|
… task_ids
Two P1 issues from review.
**Authorization (security)**
The three retention endpoints were inheriting only the global auth
middleware, not the resource-level authorization that every other
/tasks/{task_id}/* route enforces. Any authenticated principal could
export, clean, or rehydrate a task they don't own.
Adds DAuthorizedId to all three handlers matching the existing pattern:
- export → AuthorizedOperationType.read
- clean → AuthorizedOperationType.delete
- rehydrate → AuthorizedOperationType.update
**Per-entity task_id validation**
snapshot.task_id was checked against the path task_id, but each embedded
TaskMessageEntity and StateEntity carries its own task_id field that
batch_create forwards straight to MongoDB. A caller could pass
snapshot.task_id = "A" with messages whose task_id = "B" and pollute
task B's collection — Mongo has no FK to reject it.
Adds explicit per-item validation in rehydrate_task before any insert.
Returns 400 with the offending index in the message so the caller can
find the bad entry.
Tests: 2 new integration tests covering the mismatched-task_id cases
for both messages and task_states. Full suite (15 tests) still passes.
danielmillerp
left a comment
There was a problem hiding this comment.
looks good mostly just questions
| export → clean → rehydrate is a round-trip-equivalent operation. | ||
| """ | ||
| snapshot = await use_case.export_task(task_id) | ||
| return ExportTaskResponse.model_validate(snapshot) |
There was a problem hiding this comment.
this could probs be used for the cengage use case too, but looks like we are holding this in memory and then sending over wire, do we want to stream here, or output to s3 file ?
There was a problem hiding this comment.
The use case I remember just wanted a list of task IDs and task names, so this would probably be too much data (since it also exports the messages as well). Not sure if there's another use case you had in mind where they also care about chat content as well.
Either way, you're totally correct that we're holding this in memory and sending this over the wire for now. Like the rehydrate case, I think the cleanest way to support larger amounts of data here would be to allow an optional presigned upload URL where we can then output this data to (to avoid worrying about long running streaming requests or having to manage which files we upload to with which cloud providers)
| workflow calls the same use case (TaskRetentionUseCase.clean_task), not | ||
| these endpoints. | ||
|
|
||
| Authorization mirrors the existing /tasks routes via DAuthorizedId: |
There was a problem hiding this comment.
confirming, we get this for free with tasks auth z work being done right?
There was a problem hiding this comment.
right now, this just uses the DAuthorizedId shortcuts which relies on agentex-auth and uses the existing SGP permissions table so we're getting this for free based on the initial implementation (not sure if that's the authz work you were referring to being done)
| "unprocessed-events checks still apply. Admin use only." | ||
| ), | ||
| ) | ||
| idle_days: int = Field( |
| async def delete_by_task_id(self, task_id: str) -> int: | ||
| """Delete all events for a task. Idempotent. Returns rows deleted.""" | ||
| async with ( | ||
| self.start_async_db_session(True) as session, |
There was a problem hiding this comment.
what happens if we force clean and then new messages come in as we are cleaning?
There was a problem hiding this comment.
for events, we have a lock during the session that does the delete so we will only be able to persist new events after the delete finishes.
for messages, we page through existing messages so if we have new messages coming in then they will get returned by one of the subsequent page fetches and also get deleted.
ultimately we don't have a good way to acquire a lock on postgres and mongodb data at once so beyond aiming to run this cleanup during off hours to reduce the risk of a race, the only other thing i can think of doing is to verify that the task is still idle before each batch of deletes.
There was a problem hiding this comment.
actually, thinking through this more - once the cleanup has started, the only real way to leave this task in a consistent state is to finish the cleanup. if we abort half way through, then it ends up being in neither a cleaned nor a rehydrated state and we don't have any way to roll back the cleaning since the rehydration data would be coming in from outside the system.
For very long conversations (deep reasoning content, extensive tool
traces, many attachments), the rehydrate JSON body can exceed proxy /
ALB body-size limits. Same risk on the export response.
Adds an opt-in URL path to both operations:
- POST /tasks/{task_id}/export (new) — Agentex builds the snapshot and
PUTs it as JSON to a caller-supplied presigned upload URL. Returns a
small ack with byte/item counts.
- POST /tasks/{task_id}/rehydrate (extended) — accepts an optional
snapshot_url field. When set, Agentex GETs the URL and parses the body
as a TaskSnapshotEntity. Mutually exclusive with inline content; the
Pydantic validator rejects mixed payloads.
The existing inline paths (GET /export, POST /rehydrate with inline
content) are unchanged. Calls that fit in a JSON body still take the
direct route — only oversized callers need to use object storage.
SSRF guard (utils/url_validation.py): deny-known-bad. Requires https,
resolves the hostname, and rejects any address that is_private,
is_loopback, is_link_local (covers 169.254.169.254 cloud metadata),
is_reserved, is_multicast, or is_unspecified. Uses async DNS so it
doesn't block the event loop.
Format: plain JSON for both directions (round-trip-symmetric with the
inline path). Gzipped/streaming are future possibilities.
Caps: no explicit byte/timeout limits in v1. If real callers hit issues
we'll add them then.
Sync: callers wait for upload/download to complete. Async-with-status
is a follow-up if needed.
Tests: 6 new integration tests covering happy paths (mock httpx
PUT/GET, verify body == inline export, verify byte-identical round-trip
through URL form), SSRF rejection (127.0.0.1), scheme rejection (http),
and the inline-vs-URL mutual-exclusion validator. Total suite 21
passing.
Other:
- TaskRetentionService now depends on DHttpxClient (already a global
singleton via GlobalDependencies)
- New TaskExportToUrlResultEntity carries the upload ack shape
| @model_validator(mode="after") | ||
| def _exactly_one_source(self): | ||
| has_inline = bool(self.messages or self.task_states) | ||
| has_url = self.snapshot_url is not None | ||
| if has_inline and has_url: | ||
| raise ValueError( | ||
| "Provide inline content (messages/task_states) OR snapshot_url, " | ||
| "not both." | ||
| ) | ||
| return self |
There was a problem hiding this comment.
The
_exactly_one_source validator guards against providing both an inline payload and snapshot_url, but it does not guard against providing neither. A request with only task_id (empty lists, no URL) passes validation; the route then constructs an empty TaskSnapshotEntity and calls rehydrate_task, which skips all inserts and clears cleaned_at. The task ends up marked as active again with no messages or task states restored — an inconsistent state that silently discards the clean/rehydrate invariant.
| @model_validator(mode="after") | |
| def _exactly_one_source(self): | |
| has_inline = bool(self.messages or self.task_states) | |
| has_url = self.snapshot_url is not None | |
| if has_inline and has_url: | |
| raise ValueError( | |
| "Provide inline content (messages/task_states) OR snapshot_url, " | |
| "not both." | |
| ) | |
| return self | |
| @model_validator(mode="after") | |
| def _exactly_one_source(self): | |
| has_inline = bool(self.messages or self.task_states) | |
| has_url = self.snapshot_url is not None | |
| if has_inline and has_url: | |
| raise ValueError( | |
| "Provide inline content (messages/task_states) OR snapshot_url, " | |
| "not both." | |
| ) | |
| if not has_inline and not has_url: | |
| raise ValueError( | |
| "Provide exactly one of: inline content (messages/task_states) " | |
| "or snapshot_url." | |
| ) | |
| return self |
Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/api/schemas/task_retention.py
Line: 73-82
Comment:
The `_exactly_one_source` validator guards against providing *both* an inline payload and `snapshot_url`, but it does not guard against providing *neither*. A request with only `task_id` (empty lists, no URL) passes validation; the route then constructs an empty `TaskSnapshotEntity` and calls `rehydrate_task`, which skips all inserts and clears `cleaned_at`. The task ends up marked as active again with no messages or task states restored — an inconsistent state that silently discards the clean/rehydrate invariant.
```suggestion
@model_validator(mode="after")
def _exactly_one_source(self):
has_inline = bool(self.messages or self.task_states)
has_url = self.snapshot_url is not None
if has_inline and has_url:
raise ValueError(
"Provide inline content (messages/task_states) OR snapshot_url, "
"not both."
)
if not has_inline and not has_url:
raise ValueError(
"Provide exactly one of: inline content (messages/task_states) "
"or snapshot_url."
)
return self
```
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Addressed in 243924b — applied the validator change; also added test_rehydrate_rejects_empty_payload and adjusted two existing tests that previously sent empty payloads to hit downstream guards.
Greptile P1: the _exactly_one_source validator rejected "both inline AND url" but allowed "neither". A request with just task_id, empty messages/task_states, no snapshot_url passed validation, then rehydrate_task would silently skip all inserts and clear cleaned_at — leaving the task active with no content. Whatever was deleted at clean time is permanently gone. Validator now rejects empty payloads with the same shape of error as the "both" case. Two existing tests adjusted to send a real message so they reach the cleaned_at / task_id checks they were meant to exercise. New test_rehydrate_rejects_empty_payload codifies the empty-rejection so a future refactor can't silently regress. The "rehydrate an empty cleaned task" case is now unsupported. If that's ever needed, an explicit allow_empty flag is the right way to opt in; we have no use case for it today.
| logger.info( | ||
| "task_export_to_url_completed", | ||
| extra={ | ||
| "task_id": result.task_id, | ||
| "upload_url": result.upload_url, | ||
| "uploaded_bytes": result.uploaded_bytes, | ||
| "messages_count": result.messages_count, | ||
| "task_states_count": result.task_states_count, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Presigned URL credentials written to Datadog logs
upload_url is a presigned S3/GCS PUT URL whose query parameters embed the access key, date, signature, and expiry. Logging it verbatim means any user with Datadog log access can reuse the URL to overwrite the object within its validity window. The forensic goal (knowing where the upload went) is achievable by logging only the base URL (scheme + host + path, stripping query parameters) — that uniquely identifies the storage object without exposing the credential.
Strip query params before logging: urllib.parse.urlunparse(parsed._replace(query="")) where parsed = urlparse(upload_url).
Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/domain/services/task_retention_service.py
Line: 161-170
Comment:
**Presigned URL credentials written to Datadog logs**
`upload_url` is a presigned S3/GCS PUT URL whose query parameters embed the access key, date, signature, and expiry. Logging it verbatim means any user with Datadog log access can reuse the URL to overwrite the object within its validity window. The forensic goal (knowing where the upload went) is achievable by logging only the base URL (scheme + host + path, stripping query parameters) — that uniquely identifies the storage object without exposing the credential.
Strip query params before logging: `urllib.parse.urlunparse(parsed._replace(query=""))` where `parsed = urlparse(upload_url)`.
How can I resolve this? If you propose a fix, please make it concise.
Summary
Adds an operational surface for bounded retention of task chat content in shared infrastructure. Callers can snapshot a task's content, delete it from the shared stores, and later restore it byte-identically — preserving message IDs and timestamps so tool-call and reasoning references between messages stay valid.
Snapshots can be transported either inline as JSON or via caller-supplied presigned URLs, so very long conversations don't bump into proxy/ALB body-size limits.
New endpoints
GET/tasks/{task_id}/exportPOST/tasks/{task_id}/exportPOST/tasks/{task_id}/cleanmessages, Mongotask_states, Postgresevents; resetsagent_task_tracker.last_processed_event_id; setstasks.cleaned_at.POST/tasks/{task_id}/rehydratecleaned_at. Accepts either inline content (messages+task_states) or a presignedsnapshot_urlto download from — mutually exclusive.Design notes
Two transport modes for the snapshot
For modest conversations, inline JSON works fine and is the simplest path. For very long conversations (deep reasoning content, extensive tool traces, many attachments), the snapshot can exceed nginx's 1 MB default
client_max_body_sizeor AWS ALB's 1 MB body cap. The URL transport sidesteps both:snapshot_url(presigned GET); Agentex downloads, parses asTaskSnapshotEntity, and runs the same insert path as inline.The existing inline paths are unchanged. Pydantic validators enforce inline-XOR-url on rehydrate; mixed payloads are rejected.
SSRF guard
Both URL paths are vulnerable to SSRF if unguarded — a malicious or careless caller could point Agentex at internal services, cloud metadata (
169.254.169.254), or private network ranges.utils/url_validation.pyimplements a deny-known-bad guard: requireshttps, resolves the hostname asynchronously, and rejects any IP that is private, loopback, link-local, reserved, multicast, or unspecified. Usesloop.getaddrinfoso it doesn't block the event loop.The guard has a known limitation: DNS rebinding (returning a public IP for the validation check and a private IP for the actual request) isn't mitigated. Acceptable for v1 given that callers are authenticated and authorized — the threat model is operator misconfiguration, not active attack.
Cleanup design
TaskRetentionService— both the HTTP routes and the (future) scheduled Temporal sweep workflow will call the same service methods, so the cleanup path is exercised by the same code in both contexts.task_id) first, then Postgres operations, thentasks.cleaned_atlast. A retry after partial failure converges because each step is idempotent andcleaned_atis the gate that keeps subsequent runs from re-doing work.clean:status == RUNNING(regardless offorce=true).idle_days(default 7) unlessforce=true.events_deleted > 0on an idle-checked task is a signal).clean,rehydrate, andexport_to_urloperations emit structured log lines (task_cleanup_completed,task_rehydrate_completed,task_export_to_url_completed) with the result payload. Datadog log search is the forensic trail.tasks.paramsis out of scope for v1 — not exported, not stripped during cleanup, not restored. If it turns out to carry chat content for specific agents, follow up.Schema change
A single nullable column on
tasks:This is a metadata-only ALTER (Postgres ≥11 doesn't rewrite the table). Falls within the project's safe-migration shape — passes the migration safety linter.
Other changes
adapter_mongodb.batch_createnow translates pymongoBulkWriteErrorcontaining only duplicate-key sub-errors (code 11000) intoDuplicateItemError(HTTP 400). Previously it fell through to the genericExceptionhandler and surfaced as HTTP 500. Narrowly scoped — non-duplicate bulk-write errors still surface asServiceError.EventRepository.delete_by_task_id(task_id) → intAgentTaskTrackerRepository.reset_cursors_for_task(task_id) → intTaskRetentionServiceinjectsDHttpxClientfor upload/download.Tests
21 integration tests in
tests/integration/api/task_retention/test_task_retention_api.pycovering:Suite runs in ~24s via testcontainers and passes.
Test plan
make test FILE=tests/integration/api/task_retention/taskstable (metadata-only, should be instant)GET /export→ save snapshot →POST /clean→POST /rehydratewith the snapshot → re-export → diffPOST /exportto it, generate a presigned GET URL for the uploaded object,POST /clean, thenPOST /rehydratewithsnapshot_urlpointing at the GET URL, then re-export → diffcleaned_atsurfaces inGET /tasks/{id}responsesFollow-ups (not in this PR)
clean_taskon a daily sweepclean(currently a known narrow race withforce=true)tasks.paramscontent stripping if it proves to carry chat content for any agentGreptile Summary
This PR adds a bounded-retention surface for task chat content: callers can snapshot, delete, and byte-identically restore a task's messages and states via four new endpoints (
GET/POST /export,POST /clean,POST /rehydrate), backed by a newTaskRetentionServicethat drives the same logic for both the HTTP admin surface and a future Temporal sweep workflow.tasks.cleaned_at), and ordered batch-insert restore with caller-supplied ID preservation — all with precondition guards (RUNNING check, idle-threshold, unprocessed-events check, cleaned-state gate).BulkWriteError→DuplicateItemErrortranslation in the MongoDB adapter for all-duplicate-key bulk write failures;cleaned_atnullable column ontasks; two new repository methods (delete_by_task_id,reset_cursors_for_task); 21 integration tests covering the round-trip invariant and all guards.Confidence Score: 4/5
Safe to merge after stripping the presigned URL from the audit log line in export_task_to_url; all other logic is well-guarded and thoroughly tested.
The task_export_to_url_completed structured log writes the full presigned PUT URL — including the embedded HMAC signature and credential — to Datadog. Any Datadog user can extract and reuse the URL within its validity window to overwrite the object storage object. The remaining changes (SSRF guard, idempotent multi-store cleanup, ID-preserving rehydrate, adapter error translation) are solid and covered by 21 integration tests.
agentex/src/domain/services/task_retention_service.py — the export_task_to_url audit log at line 165 includes upload_url verbatim.
Security Review
task_retention_service.pyline 165): Thetask_export_to_url_completedstructured log emitsupload_urlverbatim. Presigned PUT URLs for S3/GCS embed the access key, expiry, and HMAC signature in query parameters. Any user with Datadog log access can reuse the URL to write to the object storage location within the URL's validity window. The fix is to strip query parameters before logging (log only scheme + host + path).url_validation.pyresolves the hostname at validation time but the subsequent HTTP request uses the original hostname. A DNS rebinding attack can return a public IP during validation and a private IP for the actual request. This is explicitly noted as an accepted limitation for v1 given authenticated callers.Important Files Changed
Sequence Diagram
sequenceDiagram participant Caller participant Route as task_retention.py participant UC as TaskRetentionUseCase participant Svc as TaskRetentionService participant Mongo as MongoDB participant PG as PostgreSQL participant Store as Object Storage Note over Caller,Store: Export (inline) Caller->>Route: "GET /tasks/{id}/export" Route->>UC: export_task(task_id) UC->>Svc: export_task(task_id) Svc->>PG: task_repository.get(id) Svc->>Mongo: paginated messages (asc created_at) Svc->>Mongo: paginated task_states (asc created_at) Svc-->>Caller: TaskSnapshotEntity JSON Note over Caller,Store: Export to URL Caller->>Route: "POST /tasks/{id}/export {upload_url}" Route->>UC: export_task_to_url(task_id, upload_url) UC->>Svc: export_task_to_url(...) Svc->>Svc: validate_external_url (SSRF guard) Svc->>Svc: export_task snapshot Svc->>Store: PUT snapshot JSON (presigned URL) Svc-->>Caller: "{uploaded_bytes, counts}" Note over Caller,Store: Clean Caller->>Route: "POST /tasks/{id}/clean {force, idle_days}" Route->>UC: clean_task(task_id, force, idle_days) UC->>Svc: "clean_task(enforce_idle_threshold=!force)" Svc->>PG: task_repository.get check cleaned_at status Svc->>Svc: _is_task_idle / _has_unprocessed_events Svc->>Mongo: delete messages by task_id Svc->>Mongo: delete task_states by task_id Svc->>PG: delete events by task_id Svc->>PG: reset tracker cursors Svc->>PG: "tasks.cleaned_at = now()" Svc-->>Caller: TaskCleanupResultEntity Note over Caller,Store: Rehydrate (inline or URL) Caller->>Route: "POST /tasks/{id}/rehydrate {snapshot or snapshot_url}" Route->>UC: rehydrate_task(task_id, snapshot or url) UC->>Svc: rehydrate_task(...) alt snapshot_url provided Svc->>Svc: validate_external_url (SSRF guard) Svc->>Store: GET snapshot JSON (presigned URL) Svc->>Svc: parse TaskSnapshotEntity end Svc->>Svc: validate task_id per-entity task_id Svc->>PG: task_repository.get assert cleaned_at NOT NULL Svc->>Mongo: batch_create messages (caller IDs preserved) Svc->>Mongo: batch_create task_states (caller IDs preserved) Svc->>PG: "tasks.cleaned_at = NULL" Svc-->>Caller: 204 No ContentPrompt To Fix All With AI
Reviews (5): Last reviewed commit: "address review: reject empty rehydrate p..." | Re-trigger Greptile