Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 232 additions & 60 deletions backend/app/services/demo_seeding.py

Large diffs are not rendered by default.

157 changes: 151 additions & 6 deletions backend/app/services/demo_ubi_seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from __future__ import annotations

import asyncio
import json
import logging
from dataclasses import asdict
Expand All @@ -35,9 +36,26 @@
import httpx

from backend.app.domain.demo.synthetic_ubi import UbiEventRow, UbiQueryRow
from backend.app.scripts.seed_solr_products import (
_bulk_index_products as _solr_bulk_index,
)
from backend.app.scripts.seed_solr_products import (
_ensure_collection as _solr_ensure_collection,
)

logger = logging.getLogger(__name__)

# Engine-type discriminator. ES + OpenSearch share the same `_bulk` NDJSON +
# PUT-index path (the default branch); Solr builds collections from configsets
# and indexes via the JSON update handler. Kept as a module constant so the
# branch is a single named comparison, not a repeated string literal.
_SOLR_ENGINE: Final[str] = "solr"

# Solr's `relyloop_ubi` configset (docker/solr/configsets/relyloop_ubi/conf/).
# Both UBI collections are created from it — same configset the local
# `seed_solr_products` CLI uses (seed_solr_products.py:155-157).
_SOLR_UBI_CONFIGSET: Final[str] = "relyloop_ubi"


class DemoUbiSeedError(RuntimeError):
"""Raised on unrecoverable engine errors during UBI bulk write.
Expand Down Expand Up @@ -89,18 +107,52 @@ def _httpx_basic_auth(host_auth: tuple[str, str]) -> httpx.BasicAuth:
return httpx.BasicAuth(*host_auth)


def _ensure_solr_ubi_collections_sync(
engine_base_url: str,
host_auth: tuple[str, str],
) -> None:
"""Blocking Solr UBI-collection ensure (runs in :func:`asyncio.to_thread`).

Reuses the canonical sync ``_ensure_collection`` from
``seed_solr_products`` — it uploads the ``relyloop_ubi`` configset to
ZooKeeper, then CREATEs each collection (idempotent on "already
exists"). Mirrors ``seed_solr_products.py:155-157`` exactly.
"""
with httpx.Client(base_url=engine_base_url, timeout=30.0, auth=host_auth) as client:
_solr_ensure_collection(client, _INDEX_QUERIES, _SOLR_UBI_CONFIGSET)
_solr_ensure_collection(client, _INDEX_EVENTS, _SOLR_UBI_CONFIGSET)


async def ensure_ubi_indices(
*,
engine_client: httpx.AsyncClient,
engine_base_url: str,
host_auth: tuple[str, str],
engine_type: str,
mapping_path: Path = _MAPPING_PATH,
) -> None:
"""Create the two UBI indices with the canonical mapping.

Idempotent: tolerates ``HTTP 400 resource_already_exists_exception``.
Any other non-2xx raises :class:`DemoUbiSeedError`.
"""Create the two UBI indices/collections.

* ES / OpenSearch: ``PUT /{index}`` with the canonical mapping from
``samples/ubi_index_mappings.json``. Idempotent — tolerates
``HTTP 400 resource_already_exists_exception``.
* Solr: CREATE ``ubi_queries`` + ``ubi_events`` from the
``relyloop_ubi`` configset (reuses ``seed_solr_products``'s sync
``_ensure_collection`` via ``asyncio.to_thread``). Idempotent —
``_ensure_collection`` treats "already exists" as success.

Any unrecoverable engine error raises :class:`DemoUbiSeedError` with a
``ubi_seed/ensure_indices/{collection}: ...`` prefix.
"""
if engine_type == _SOLR_ENGINE:
try:
await asyncio.to_thread(_ensure_solr_ubi_collections_sync, engine_base_url, host_auth)
except httpx.HTTPError as exc:
raise DemoUbiSeedError(
f"ubi_seed/ensure_indices/{_INDEX_QUERIES}+{_INDEX_EVENTS}: {exc}"
) from exc
return

mappings = _load_mappings(mapping_path)
auth = _httpx_basic_auth(host_auth)
for index_name in (_INDEX_QUERIES, _INDEX_EVENTS):
Expand Down Expand Up @@ -145,6 +197,65 @@ def _normalize_application(
return out


def _to_solr_date(value: object) -> object:
"""Normalize an ISO-8601 timestamp to Solr's ``DatePointField`` format.

The synthetic-UBI generator stamps rows via ``datetime.isoformat()``,
which renders the UTC offset as ``+00:00``. Solr's ``DatePointField``
only accepts the canonical ``...Z`` form (and rejects ``+00:00`` with
"Invalid Date String"). ES/OpenSearch's ``date`` type tolerates both,
so this conversion is Solr-only. Non-string / non-``+00:00`` values
pass through untouched.
"""
if isinstance(value, str) and value.endswith("+00:00"):
return value[: -len("+00:00")] + "Z"
return value


def _to_solr_docs(rows: list[dict[str, object]], collection: str) -> list[dict[str, object]]:
"""Shape normalized UBI rows into Solr update docs.

The ``relyloop_ubi`` configset declares ``id`` as a required
``uniqueKey`` and the UBI rows carry no ``id`` (the ES path uses
``{"index": {}}`` auto-id). Synthesize a stable, collision-resistant
``id`` per doc (``{collection}-{ordinal}``) and rewrite the
``timestamp`` to Solr's ``...Z`` date form. Every other field name
already matches the configset schema (query_id / user_query /
application / action_name / object_id / position / dwell_seconds).
"""
out: list[dict[str, object]] = []
for ordinal, row in enumerate(rows):
# Include ``application`` (unique per scenario) in the synthesized id —
# ``ubi_queries`` / ``ubi_events`` are SHARED collections across all
# scenarios, so a bare ``{collection}-{ordinal}`` would collide (and
# silently overwrite) when a second Solr scenario seeds into them.
app = row.get("application", "default")
doc: dict[str, object] = {"id": f"{collection}-{app}-{ordinal}", **row}
if "timestamp" in doc:
doc["timestamp"] = _to_solr_date(doc["timestamp"])
out.append(doc)
return out


def _seed_solr_ubi_sync(
engine_base_url: str,
host_auth: tuple[str, str],
queries_docs: list[dict[str, object]],
events_docs: list[dict[str, object]],
) -> None:
"""Blocking Solr UBI write (runs in :func:`asyncio.to_thread`).

Reuses ``seed_solr_products``'s sync ``_bulk_index_products`` — it
``POST``s the doc list to ``/solr/{collection}/update?commit=true``
with a JSON body (NOT ES ``_bulk`` NDJSON). One commit per collection.
"""
with httpx.Client(base_url=engine_base_url, timeout=30.0, auth=host_auth) as client:
if queries_docs:
_solr_bulk_index(client, _INDEX_QUERIES, queries_docs)
if events_docs:
_solr_bulk_index(client, _INDEX_EVENTS, events_docs)


def _build_bulk_ndjson(rows: list[dict[str, object]]) -> str:
"""Alternate ``{"index": {}}`` action lines with row payloads.

Expand All @@ -164,6 +275,7 @@ async def seed_synthetic_ubi(
engine_client: httpx.AsyncClient,
engine_base_url: str,
host_auth: tuple[str, str],
engine_type: str,
scenario_slug: str,
target_application: str,
queries: list[UbiQueryRow],
Expand All @@ -173,10 +285,14 @@ async def seed_synthetic_ubi(

Returns the number of events written.

* ES / OpenSearch: ``POST /{index}/_bulk`` NDJSON (auto-id rows).
* Solr: ``POST /solr/{collection}/update?commit=true`` with a JSON
doc array (synthesized ``id`` uniqueKey, ``...Z`` timestamps).

Raises:
ValueError: if ``(scenario_slug, target_application)`` is not in
:data:`DEMO_UBI_SCENARIO_ALLOWLIST`.
DemoUbiSeedError: on engine ``_bulk`` HTTP errors.
DemoUbiSeedError: on engine write HTTP errors.
"""
pair = (scenario_slug, target_application)
if pair not in DEMO_UBI_SCENARIO_ALLOWLIST:
Expand All @@ -186,10 +302,39 @@ async def seed_synthetic_ubi(
f"DEMO_UBI_SCENARIO_ALLOWLIST"
)

auth = _httpx_basic_auth(host_auth)
queries_payload = _normalize_application(queries, target_application)
events_payload = _normalize_application(events, target_application)

if engine_type == _SOLR_ENGINE:
queries_docs = _to_solr_docs(queries_payload, _INDEX_QUERIES)
events_docs = _to_solr_docs(events_payload, _INDEX_EVENTS)
try:
await asyncio.to_thread(
_seed_solr_ubi_sync,
engine_base_url,
host_auth,
queries_docs,
events_docs,
)
except httpx.HTTPError as exc:
# Keep the bulk_write/{collection} prefix uniform with the ES
# path so the orchestrator's attribution + 503 stays the same.
# httpx.HTTPStatusError carries the failing request URL; fall
# back to the queries collection when it's a transport error
# (httpx.HTTPError.request raises RuntimeError when unset, so
# guard the access rather than relying on a default).
collection = _INDEX_QUERIES
try:
request_url = str(exc.request.url)
except RuntimeError:
request_url = ""
if f"/solr/{_INDEX_EVENTS}/" in request_url:
collection = _INDEX_EVENTS
raise DemoUbiSeedError(f"ubi_seed/bulk_write/{collection}: {exc}") from exc
return len(events)

auth = _httpx_basic_auth(host_auth)

for index_name, payload in (
(_INDEX_QUERIES, queries_payload),
(_INDEX_EVENTS, events_payload),
Expand Down
Loading
Loading