Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions backend/app/api/v1/_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from typing import Annotated, Any

from arq.connections import ArqRedis
from arq.constants import result_key_prefix
from fastapi import APIRouter, Body, Depends, HTTPException, Request, Response, status
from pydantic import BaseModel, ConfigDict, Field
from redis.asyncio import Redis
Expand Down Expand Up @@ -79,6 +80,12 @@
# should never appear in operator scripts.
_TEST_PREFIX = "/_test"

# Deterministic Arq job id for the demo reseed — one in-flight run at a time
# (rapid-double-click protection). Used both to enqueue and to clear the stale
# result key that would otherwise block a legitimate retry within keep_result
# (1h). See the reseed POST handler. bug_reseed_failure_blocks_retry_arq_singleton_dedup.
_RESEED_JOB_ID = "demo_reseed:singleton"


def _err(status_code: int, code: str, message: str, retryable: bool) -> HTTPException:
"""Canonical error-envelope shape — mirrors ``studies.py:74-78``.
Expand Down Expand Up @@ -727,11 +734,21 @@ async def reseed_demo(
)
await status_set(arq_pool, initial)

# Deterministic job id — Arq drops duplicate enqueues with the same
# _job_id within its dedup window (default 60s). A faster double-click
# gets one job; a slower retry after the previous run completed
# creates a fresh job (because Redis state has moved on).
#
# Deterministic job id — Arq aborts a duplicate enqueue with the same
# _job_id while EITHER the job is still in-flight (``arq:job:<id>``) OR a
# finished run's result is still cached (``arq:result:<id>``). The result
# is kept for ``keep_result`` seconds (Arq default 3600s = 1 HOUR), NOT a
# 60s window — so without the explicit clear below, a legitimate retry
# within an hour of a COMPLETED run is silently deduped (enqueue_job
# returns None) and the operator is stuck on "enqueued — waiting for
# worker" forever. The ``running``-status 409 guard above already proved
# no run is genuinely in-flight, so any lingering result is a stale
# completed/failed artifact that is safe to drop before re-enqueue. This
# preserves rapid-double-click protection (the first click's ``arq:job``
# key still dedupes the second) while unblocking deliberate retries.
# bug_reseed_failure_blocks_retry_arq_singleton_dedup.
await arq_pool.delete(f"{result_key_prefix}{_RESEED_JOB_ID}")

# ``engines`` is None when the body is absent OR ``{"engines": null}``
# OR ``{}`` (FastAPI parses an empty body to None when the body param
# has ``default=None``). All three are the "reseed every reachable
Expand All @@ -741,7 +758,7 @@ async def reseed_demo(
engines_filter = body.engines if body is not None else None
job = await arq_pool.enqueue_job(
"run_demo_reseed",
_job_id="demo_reseed:singleton",
_job_id=_RESEED_JOB_ID,
engines=engines_filter,
)
logger.info(
Expand Down
68 changes: 46 additions & 22 deletions backend/app/services/demo_seeding.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,36 +418,60 @@ async def _emit_progress(status_callback: StatusCallback, progress: ReseedStatus
"http://localhost:8983": "http://solr:8983",
}

# The Compose-DNS *targets* of the mapping above. When
# ``scripts/seed_meaningful_demos.py`` is imported INSIDE a container
# (``_INSIDE_CONTAINER`` → ``/.dockerenv`` present), its ``ES`` / ``OS`` /
# ``SOLR`` constants — and therefore every scenario's ``host_base_url`` — are
# already these Compose-DNS URLs, not the host-shell ``localhost`` URLs. The
# worker reseed runs in-container, so it feeds these already-resolved values
# into :func:`_resolve_engine_base_url`; treating them as a no-op pass-through
# (rather than raising) is what makes the resolver idempotent.
# bug_reseed_resolve_engine_base_url_not_idempotent_in_container.
_COMPOSE_DNS_TARGETS: Final[frozenset[str]] = frozenset(_ENGINE_BASE_URL_MAPPING.values())


def _resolve_engine_base_url(host_base_url: str) -> str:
"""Map the CLI's host-shell URLs to in-container Compose DNS names.
"""Map an engine base URL to the in-container Compose-DNS name. Idempotent.

The imported :data:`SCENARIOS` constant from
``scripts/seed_meaningful_demos.py`` carries ``host_base_url`` values
like ``"http://localhost:9200"`` (ES), ``"http://localhost:9201"``
(OS), and ``"http://localhost:8983"`` (Solr) — correct from the host
shell, wrong from inside the API container where ``localhost`` is the
API itself. This function transparently maps to the Compose service
DNS names.

Pure / deterministic / no I/O. No env hooks (per cycle-4 plan review
A1 — AC-5's test injection lives in the test harness, not here).

Per FR-1d.
``scripts/seed_meaningful_demos.py`` carries ``host_base_url`` values that
depend on WHERE the script is imported:

- From the host shell: ``"http://localhost:9200"`` (ES) /
``":9201"`` (OS) / ``":8983"`` (Solr) — correct from the host, wrong
from inside the API container where ``localhost`` is the API itself.
These are mapped to the Compose service DNS names.
- From INSIDE a container (``_INSIDE_CONTAINER`` in the seed script —
``/.dockerenv`` present, which is ALWAYS true for the worker reseed):
the constants are already the Compose-DNS URLs (e.g.
``"http://elasticsearch:9200"``). Those pass through unchanged.

The pass-through (rather than raising on an already-resolved value) is what
makes this idempotent — required because the worker reseed runs in-container
and so feeds the already-Compose-DNS URLs here. (Pre-existing latent bug:
the home-button reseed's integration tests mock the engine probe layer, so
the real in-container resolve path was never exercised end-to-end —
bug_reseed_resolve_engine_base_url_not_idempotent_in_container.)

Pure / deterministic / no I/O.

Raises:
ValueError: when ``host_base_url`` is not one of the three
recognized CLI URLs. The orchestrator unwraps this to a
:class:`DemoSeedingError` so the route handler returns a
503 ``SEED_FAILED`` envelope.
ValueError: when ``host_base_url`` is neither a recognized host-shell
URL nor an already-resolved Compose-DNS target. The orchestrator
unwraps this to a :class:`DemoSeedingError` so the route handler
returns a 503 ``SEED_FAILED`` envelope.
"""
resolved = _ENGINE_BASE_URL_MAPPING.get(host_base_url)
if resolved is None:
raise ValueError(
f"Unrecognized engine host URL: {host_base_url}. "
f"Expected one of {sorted(_ENGINE_BASE_URL_MAPPING)}."
)
return resolved
if resolved is not None:
return resolved
# Idempotent pass-through for already-resolved Compose-DNS targets.
if host_base_url in _COMPOSE_DNS_TARGETS:
return host_base_url
Comment on lines 464 to +469

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To make the URL resolution more robust and defensive against trailing slashes (which are common in URL configurations), consider stripping any trailing slashes from host_base_url before performing the lookup in _ENGINE_BASE_URL_MAPPING and _COMPOSE_DNS_TARGETS.

    normalized_url = host_base_url.rstrip("/")
    resolved = _ENGINE_BASE_URL_MAPPING.get(normalized_url)
    if resolved is not None:
        return resolved
    # Idempotent pass-through for already-resolved Compose-DNS targets.
    if normalized_url in _COMPOSE_DNS_TARGETS:
        return normalized_url

raise ValueError(
f"Unrecognized engine host URL: {host_base_url}. "
f"Expected a host URL {sorted(_ENGINE_BASE_URL_MAPPING)} "
f"or an already-resolved Compose-DNS URL {sorted(_COMPOSE_DNS_TARGETS)}."
)


# ---------------------------------------------------------------------------
Expand Down
16 changes: 16 additions & 0 deletions backend/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ class SpyArqPool:
def __init__(self) -> None:
self.calls: list[tuple[object, ...]] = []
self._store: dict[object, object] = {}
# Keys passed to ``delete`` — the demo-reseed POST clears the stale
# Arq result key (``arq:result:<job_id>``) before re-enqueuing so a
# completed run's cached result can't dedup-block a legitimate retry
# (bug_reseed_failure_blocks_retry_arq_singleton_dedup).
self.deleted: list[object] = []

async def enqueue_job(self, name: str, *args: object, **kwargs: object) -> object:
self.calls.append((name, *args)) # flattened: (name,) + args
Expand All @@ -218,6 +223,17 @@ async def set(self, key: object, value: object, **kwargs: object) -> None:
# function-scoped test double.
self._store[key] = value

async def delete(self, *keys: object) -> int:
# Mirror ``redis.delete(*keys)``: drop each key from the in-memory
# store, record it for assertions, return the count that was present.
removed = 0
for key in keys:
self.deleted.append(key)
if key in self._store:
del self._store[key]
removed += 1
return removed


_UNSET: object = object()
"""Sentinel distinguishing "attr unset" from "attr is None"."""
Expand Down
37 changes: 37 additions & 0 deletions backend/tests/integration/test_demo_reseed_engines_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,43 @@ async def test_post_with_null_engines_treats_as_all_engines(
assert response.status_code == 202, response.text


async def test_post_clears_stale_result_key_before_enqueue(
async_client: httpx.AsyncClient,
arq_pool_spy: object,
) -> None:
"""A prior run's cached Arq result must not dedup-block a retry.

bug_reseed_failure_blocks_retry_arq_singleton_dedup — Arq keeps a finished
job's result under ``arq:result:<job_id>`` for keep_result (default 1h) and
silently aborts a re-enqueue of the same ``_job_id`` while that key exists,
leaving the operator stuck on "enqueued — waiting for worker". The reseed
POST handler deletes that key before enqueuing so a deliberate retry after
a completed/failed run actually runs.
"""
from arq.constants import result_key_prefix

from backend.app.api.v1._test import _RESEED_JOB_ID

spy = arq_pool_spy
result_key = f"{result_key_prefix}{_RESEED_JOB_ID}"
# Simulate a completed prior run whose result is still cached in Redis.
spy._store[result_key] = b"stale-result" # type: ignore[attr-defined]

response = await async_client.post("/api/v1/_test/demo/reseed", json={})
assert response.status_code == 202, response.text

# The stale result key was deleted (so Arq won't dedup the enqueue) ...
assert result_key in spy.deleted, ( # type: ignore[attr-defined]
f"expected the stale result key to be cleared, deleted={spy.deleted!r}" # type: ignore[attr-defined]
)
assert result_key not in spy._store # type: ignore[attr-defined]
# ... and the job was actually enqueued (not deduped to None).
enqueued = [c for c in spy.calls if c[0] == "run_demo_reseed"] # type: ignore[attr-defined]
assert len(enqueued) == 1, (
f"expected exactly one run_demo_reseed enqueue, got {spy.calls!r}" # type: ignore[attr-defined]
)


async def test_post_with_no_body_accepted(
async_client: httpx.AsyncClient,
arq_pool_spy: object,
Expand Down
17 changes: 17 additions & 0 deletions backend/tests/unit/services/test_demo_seeding.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ def test_resolve_engine_base_url_unknown_raises() -> None:
_resolve_engine_base_url("http://example.com:9200")


@pytest.mark.parametrize(
"compose_dns_url",
["http://elasticsearch:9200", "http://opensearch:9200", "http://solr:8983"],
)
def test_resolve_engine_base_url_is_idempotent_for_compose_dns(compose_dns_url: str) -> None:
"""An already-resolved Compose-DNS URL passes through unchanged.

bug_reseed_resolve_engine_base_url_not_idempotent_in_container — when
scripts/seed_meaningful_demos.py is imported INSIDE a container
(``_INSIDE_CONTAINER``), the SCENARIOS' ``host_base_url`` are already the
Compose-DNS URLs, and the worker reseed feeds them here. Before this fix
the resolver raised ``Unrecognized engine host URL: http://elasticsearch:9200``
and the whole reseed failed at the reachability snapshot.
"""
assert _resolve_engine_base_url(compose_dns_url) == compose_dns_url


# ---------------------------------------------------------------------------
# DEMO_RESEED_LOCK_KEY — deterministic derivation
# ---------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion docs/00_overview/DASHBOARD.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ _Top-level index across MVP1 → GA v1+ as of **2026-06-18**. Click a release na
| Release | Theme | Progress | Status |
|---|---|---|---|
| [MVP1 / v0.1](MVP1_DASHBOARD.md) | The Loop | 100 / 100 scoped done | **Complete** |
| [MVP2 / v0.2](MVP2_DASHBOARD.md) | Three-Engine + Real Signals | 28 / 30 scoped done · 17 remaining | **In progress** |
| [MVP2 / v0.2](MVP2_DASHBOARD.md) | Three-Engine + Real Signals | 28 / 30 scoped done · 16 remaining | **In progress** |
| MVP3 / v0.3 | Observable | — | **Not yet scoped** |
| [GA v1 / v1.0](GA_DASHBOARD.md) | Production-ready | 1 item(s) queued | **Held / queued** |

Expand Down
Loading
Loading