From 1db3fe79ee139c696e4e41bb5d7a1e2f9fec8c60 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Tue, 2 Sep 2025 07:59:59 +0200 Subject: [PATCH 01/15] feat: implement retry decorator with exponential backoff and rate limit handling --- .../impl/settings/retry_decorator_settings.py | 85 +++++++++ .../impl/utils/retry_decorator.py | 164 ++++++++++++++++++ .../src/rag_core_lib/impl/utils/utils.py | 71 ++++++++ 3 files changed, 320 insertions(+) create mode 100644 libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py create mode 100644 libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py create mode 100644 libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py b/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py new file mode 100644 index 00000000..0338ff06 --- /dev/null +++ b/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py @@ -0,0 +1,85 @@ +"""Module contains settings regarding the STACKIT vLLM.""" + +from pydantic import Field, PositiveInt, model_validator +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class RetryDecoratorSettings(BaseSettings): + """ + Contains settings regarding the retry decorator. + + Attributes + ---------- + max_retries : int (> 0) + Total retries (not counting the first attempt). + retry_base_delay : float (>= 0) + Base delay in seconds for the first retry. + retry_max_delay : float (> 0) + Maximum delay cap for any single wait. + backoff_factor : float (>= 1) + Exponential backoff factor. + attempt_cap : int (>= 0) + Cap for exponent growth (backoff_factor ** attempt_cap). + jitter_min : float (>= 0) + Minimum jitter to add to wait times. + jitter_max : float (>= jitter_min) + Maximum jitter to add to wait times. + """ + + # Pydantic v2 settings configuration + model_config = SettingsConfigDict(env_prefix="RETRY_DECORATOR_", case_sensitive=False) + + # Constrained fields + max_retries: PositiveInt = Field( + default=5, + title="Max Retries", + description="Total retries, not counting the initial attempt.", + ) + retry_base_delay: float = Field( + default=0.5, + ge=0, + title="Retry Base Delay", + description="Base delay in seconds for the first retry.", + ) + retry_max_delay: float = Field( + default=600.0, + gt=0, + title="Retry Max Delay", + description="Maximum delay cap in seconds for any single wait.", + ) + backoff_factor: float = Field( + default=2.0, + ge=1.0, + title="Backoff Factor", + description="Exponential backoff factor (>= 1).", + ) + attempt_cap: int = Field( + default=6, + ge=0, + title="Attempt Cap", + description="Cap for exponent growth (backoff_factor ** attempt_cap).", + ) + jitter_min: float = Field( + default=0.05, + ge=0.0, + title="Jitter Min (s)", + description="Minimum jitter in seconds.", + ) + jitter_max: float = Field( + default=0.25, + ge=0.0, + title="Jitter Max (s)", + description="Maximum jitter in seconds.", + ) + + @model_validator(mode="after") + def _check_relations(self) -> "RetryDecoratorSettings": + # Ensure jitter_max >= jitter_min + if self.jitter_max < self.jitter_min: + raise ValueError("jitter_max must be >= jitter_min") + # Ensure retry_max_delay is meaningful vs base + if self.retry_max_delay <= 0: + raise ValueError("retry_max_delay must be > 0") + if self.backoff_factor < 1: + raise ValueError("backoff_factor must be >= 1") + return self diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py new file mode 100644 index 00000000..f2619d14 --- /dev/null +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py @@ -0,0 +1,164 @@ +"""Reusable exponential backoff / retry decorator for sync and async functions.""" + +import asyncio +import inspect +import logging +import random +import time +from functools import wraps +from typing import Callable, Optional, ParamSpec, TypeVar + +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings +from rag_core_lib.impl.utils.utils import ( + headers_from_exception, + status_code_from_exception, + wait_from_rate_limit_headers, +) + + +# Use ParamSpec and TypeVar for type-safe decorators +P = ParamSpec("P") +R = TypeVar("R") + + +class _RetryEngine: + """Internal helper to keep retry logic small in the public API function.""" + + def __init__( + self, + cfg: RetryDecoratorSettings, + exceptions: tuple[type[BaseException], ...], + rate_limit_exceptions: tuple[type[BaseException], ...], + rate_limit_statuses: tuple[int, ...], + rate_limit_header_names: tuple[str, ...], + is_rate_limited: Optional[Callable[[BaseException], bool]], + logger: Optional[logging.Logger], + ) -> None: + self.cfg = cfg + self.exceptions = exceptions + self.rate_limit_exceptions = rate_limit_exceptions + self.rate_limit_statuses = rate_limit_statuses + self.rate_limit_header_names = rate_limit_header_names + self.is_rate_limited_cb = is_rate_limited + self.logger = logger + + def decorate(self, fn: Callable[P, R]) -> Callable[P, R]: + if inspect.iscoroutinefunction(fn): + + @wraps(fn) + async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + for attempt in range(self.cfg.max_retries + 1): + try: + return await fn(*args, **kwargs) + except self.exceptions as exc: # type: ignore[misc] + wait_time = self._calculate_wait_time(attempt, exc) + if wait_time is None: + raise + await asyncio.sleep(wait_time) + raise AssertionError("Retry loop exited unexpectedly.") + + return async_wrapper + + @wraps(fn) + def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + for attempt in range(self.cfg.max_retries + 1): + try: + return fn(*args, **kwargs) + except self.exceptions as exc: # type: ignore[misc] + wait_time = self._calculate_wait_time(attempt, exc) + if wait_time is None: + raise + time.sleep(wait_time) + raise AssertionError("Retry loop exited unexpectedly.") + + return sync_wrapper + + def _should_rate_limited(self, exc: BaseException) -> bool: + if self.is_rate_limited_cb and self.is_rate_limited_cb(exc): + return True + if isinstance(exc, self.rate_limit_exceptions): + return True + status_code = status_code_from_exception(exc) + if status_code in self.rate_limit_statuses: + return True + msg = str(exc).lower() + return ("rate limit" in msg) or ("ratelimit" in msg) + + def _compute_backoff_wait(self, attempt: int) -> float: + delay = self.cfg.retry_base_delay * (self.cfg.backoff_factor ** min(attempt, self.cfg.attempt_cap)) + return min(delay, self.cfg.retry_max_delay) + + def _with_jitter(self, seconds: float) -> float: + return min( + seconds + random.uniform(self.cfg.jitter_min, self.cfg.jitter_max), # noqa: S311 non-crypto jitter + self.cfg.retry_max_delay, + ) + + def _calculate_wait_time(self, attempt: int, exc: BaseException) -> float | None: + """Return wait seconds or None to re-raise.""" + total_attempts = self.cfg.max_retries + 1 + if attempt == self.cfg.max_retries: + if self.logger: + self.logger.error("Failed after %d attempts: %s", total_attempts, exc, exc_info=False) + return None + + if self._should_rate_limited(exc): + headers = headers_from_exception(exc) + wait = wait_from_rate_limit_headers(headers, self.rate_limit_header_names) + if wait is None: + wait = self._compute_backoff_wait(attempt) + final_wait = self._with_jitter(wait) + if self.logger: + self.logger.warning( + ( + "Rate limited. Remaining: req=%s tok=%s. Reset in: req=%s tok=%s. " + "Retrying in %.2fs (attempt %d/%d)..." + ), + headers.get("x-ratelimit-remaining-requests", "?"), + headers.get("x-ratelimit-remaining-tokens", "?"), + headers.get("x-ratelimit-reset-requests", "?"), + headers.get("x-ratelimit-reset-tokens", "?"), + final_wait, + attempt + 1, + total_attempts, + ) + return final_wait + + delay = self._compute_backoff_wait(attempt) + if self.logger: + self.logger.warning( + "Attempt %d/%d failed: %s. Retrying in %.2fs...", + attempt + 1, + total_attempts, + exc, + delay, + exc_info=False, + ) + return delay + + +def retry_with_backoff( + *, + settings: RetryDecoratorSettings | None = None, + exceptions: tuple[type[BaseException], ...] = (Exception,), + rate_limit_exceptions: tuple[type[BaseException], ...] = (), + rate_limit_statuses: tuple[int, ...] = (429,), + rate_limit_header_names: tuple[str, ...] = ( + "x-ratelimit-reset-requests", + "x-ratelimit-reset-tokens", + ), + is_rate_limited: Optional[Callable[[BaseException], bool]] = None, + logger: Optional[logging.Logger] = None, +) -> Callable[[Callable[P, R]], Callable[P, R]]: + """Apply robust retry logic with exponential backoff and rate-limit awareness.""" + cfg = settings or RetryDecoratorSettings() + engine = _RetryEngine( + cfg=cfg, + exceptions=exceptions, + rate_limit_exceptions=rate_limit_exceptions, + rate_limit_statuses=rate_limit_statuses, + rate_limit_header_names=rate_limit_header_names, + is_rate_limited=is_rate_limited, + logger=logger, + ) + return engine.decorate diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py new file mode 100644 index 00000000..f39f917b --- /dev/null +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py @@ -0,0 +1,71 @@ +import re +from typing import Any, Iterable, Optional + + +def _to_seconds(v): + if v is None: + return None + try: + s = str(v).strip().lower() + # Support composite durations like "1h21m55s", as well as single-unit values + if any(u in s for u in ("h", "m", "s")): + total = 0.0 + for val, unit in re.findall(r"([0-9]+(?:\.[0-9]+)?)([hms])", s): + num = float(val) + if unit == "h": + total += num * 3600 + elif unit == "m": + total += num * 60 + else: # "s" + total += num + return total + # Fallback: plain number interpreted as seconds + return float(s) + except Exception: + return None + + +def _normalize_headers(raw_headers: Any) -> dict[str, str]: + """Return a lowercased dict[str, str] from httpx.Headers or mapping-like objects.""" + if not raw_headers: + return {} + try: + if hasattr(raw_headers, "items"): + items = list(raw_headers.items()) # works for dict-like and httpx.Headers + else: + items = list(dict(raw_headers).items()) + except Exception: + try: + items = list(dict(raw_headers).items()) + except Exception: + items = [] + out: dict[str, str] = {} + for k, v in items: + try: + out[str(k).lower()] = str(v) + except Exception: + continue + return out + + +def status_code_from_exception(exc: BaseException) -> Optional[int]: + resp = getattr(exc, "response", None) + return getattr(resp, "status_code", None) + + +def headers_from_exception(exc: BaseException) -> dict[str, str]: + resp = getattr(exc, "response", None) + raw = getattr(resp, "headers", None) + return _normalize_headers(raw) + + +def wait_from_rate_limit_headers( + headers: dict[str, str], + header_names: Iterable[str] = ("x-ratelimit-reset-requests", "x-ratelimit-reset-tokens"), +) -> Optional[float]: + candidates = [] + for name in header_names: + sec = _to_seconds(headers.get(name)) + if sec is not None: + candidates.append(sec) + return max(candidates) if candidates else None From cbdd2879b59263b2e928f2ff61cdb0a0be8293ea Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Tue, 2 Sep 2025 09:00:08 +0200 Subject: [PATCH 02/15] refactor: improve retry decorator by separating async and sync logic; normalize dict items in utils --- .../impl/utils/retry_decorator.py | 28 +++++++++++-------- .../src/rag_core_lib/impl/utils/utils.py | 14 +++++----- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py index f2619d14..8be5bb0b 100644 --- a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py @@ -44,21 +44,25 @@ def __init__( def decorate(self, fn: Callable[P, R]) -> Callable[P, R]: if inspect.iscoroutinefunction(fn): + return self._decorate_async(fn) + return self._decorate_sync(fn) - @wraps(fn) - async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: - for attempt in range(self.cfg.max_retries + 1): - try: - return await fn(*args, **kwargs) - except self.exceptions as exc: # type: ignore[misc] - wait_time = self._calculate_wait_time(attempt, exc) - if wait_time is None: - raise - await asyncio.sleep(wait_time) - raise AssertionError("Retry loop exited unexpectedly.") + def _decorate_async(self, fn: Callable[P, R]) -> Callable[P, R]: + @wraps(fn) + async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + for attempt in range(self.cfg.max_retries + 1): + try: + return await fn(*args, **kwargs) + except self.exceptions as exc: # type: ignore[misc] + wait_time = self._calculate_wait_time(attempt, exc) + if wait_time is None: + raise + await asyncio.sleep(wait_time) + raise AssertionError("Retry loop exited unexpectedly.") - return async_wrapper + return async_wrapper + def _decorate_sync(self, fn: Callable[P, R]) -> Callable[P, R]: @wraps(fn) def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> R: for attempt in range(self.cfg.max_retries + 1): diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py index f39f917b..831e22cf 100644 --- a/libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/utils.py @@ -25,6 +25,11 @@ def _to_seconds(v): return None +def _normalize_dict_items(items: Iterable[Any]) -> dict[str, str]: + """Normalize dict items by converting keys and values to a consistent format.""" + return {str(k).lower(): str(v).lower() for k, v in items if k is not None and v is not None} + + def _normalize_headers(raw_headers: Any) -> dict[str, str]: """Return a lowercased dict[str, str] from httpx.Headers or mapping-like objects.""" if not raw_headers: @@ -39,13 +44,8 @@ def _normalize_headers(raw_headers: Any) -> dict[str, str]: items = list(dict(raw_headers).items()) except Exception: items = [] - out: dict[str, str] = {} - for k, v in items: - try: - out[str(k).lower()] = str(v) - except Exception: - continue - return out + + return _normalize_dict_items(items) def status_code_from_exception(exc: BaseException) -> Optional[int]: From 815ca8767034257d3a17e51fceaa1fa29fb75f89 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Tue, 2 Sep 2025 09:24:24 +0200 Subject: [PATCH 03/15] test: add comprehensive tests for retry decorator with async and sync handling --- .../tests/retry_decorator_test.py | 178 ++++++++++++++++++ 1 file changed, 178 insertions(+) create mode 100644 libs/rag-core-lib/tests/retry_decorator_test.py diff --git a/libs/rag-core-lib/tests/retry_decorator_test.py b/libs/rag-core-lib/tests/retry_decorator_test.py new file mode 100644 index 00000000..d8745def --- /dev/null +++ b/libs/rag-core-lib/tests/retry_decorator_test.py @@ -0,0 +1,178 @@ +import asyncio +import logging +import time +from typing import Optional + +import pytest + +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings +from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff + + +class DummyError(Exception): + pass + + +class RateLimitError(Exception): + def __init__(self, headers: Optional[dict[str, str]] = None, status_code: Optional[int] = None): + self.response = type("Resp", (), {"headers": headers or {}, "status_code": status_code})() + super().__init__("rate limit") + + +@pytest.mark.asyncio +async def test_async_success_first_try(): + calls = {"n": 0} + + @retry_with_backoff(settings=RetryDecoratorSettings(max_retries=2)) + async def fn(): + calls["n"] += 1 + return 42 + + assert await fn() == 42 + assert calls["n"] == 1 + + +@pytest.mark.asyncio +async def test_async_retries_then_success(monkeypatch): + calls = {"n": 0} + slept = [] + + async def fake_sleep(x): + slept.append(x) + + monkeypatch.setattr(asyncio, "sleep", fake_sleep) + + @retry_with_backoff(settings=RetryDecoratorSettings(max_retries=3, retry_base_delay=0.01)) + async def fn(): + calls["n"] += 1 + if calls["n"] < 3: + raise DummyError("boom") + return "ok" + + assert await fn() == "ok" + assert calls["n"] == 3 + # Expect at least two sleeps due to two failures + assert len(slept) >= 2 + + +def test_sync_success_first_try(): + calls = {"n": 0} + + @retry_with_backoff(settings=RetryDecoratorSettings(max_retries=2)) + def fn(): + calls["n"] += 1 + return 7 + + assert fn() == 7 + assert calls["n"] == 1 + + +def test_sync_retries_then_fail(monkeypatch): + calls = {"n": 0} + slept = [] + + def fake_sleep(x): + slept.append(x) + + monkeypatch.setattr(time, "sleep", fake_sleep) + + @retry_with_backoff(settings=RetryDecoratorSettings(max_retries=2, retry_base_delay=0.01)) + def fn(): + calls["n"] += 1 + raise DummyError("always") + + with pytest.raises(DummyError): + fn() + # 1 initial + 2 retries = 3 calls total + assert calls["n"] == 3 + # Two sleeps (after two failures) + assert len(slept) == 2 + + +@pytest.mark.asyncio +async def test_async_rate_limit_uses_header_wait(monkeypatch): + calls = {"n": 0} + slept = [] + + async def fake_sleep(x): + slept.append(x) + + monkeypatch.setattr(asyncio, "sleep", fake_sleep) + + headers = { + "x-ratelimit-reset-requests": "1.5s", + "x-ratelimit-remaining-requests": "0", + } + + @retry_with_backoff( + settings=RetryDecoratorSettings(max_retries=1, retry_base_delay=0.01), + rate_limit_exceptions=(RateLimitError,), + ) + async def fn(): + calls["n"] += 1 + if calls["n"] == 1: + raise RateLimitError(headers=headers, status_code=429) + return "ok" + + out = await fn() + assert out == "ok" + assert calls["n"] == 2 + # Should sleep roughly ~1.5s (+jitter). Verify >= 1.5 + assert any(x >= 1.5 for x in slept) + + +@pytest.mark.asyncio +async def test_is_rate_limited_callback(monkeypatch): + calls = {"n": 0} + slept = [] + + async def fake_sleep(x): + slept.append(x) + + monkeypatch.setattr(asyncio, "sleep", fake_sleep) + + def mark_rate_limited(exc: BaseException) -> bool: + return isinstance(exc, DummyError) + + @retry_with_backoff( + settings=RetryDecoratorSettings(max_retries=1, retry_base_delay=0.01), + is_rate_limited=mark_rate_limited, + ) + async def fn(): + calls["n"] += 1 + if calls["n"] == 1: + raise DummyError("treated as rate limited") + return "ok" + + out = await fn() + assert out == "ok" + assert calls["n"] == 2 + assert len(slept) == 1 + + +def test_sync_rate_limit_headers(monkeypatch): + calls = {"n": 0} + slept = [] + + def fake_sleep(x): + slept.append(x) + + monkeypatch.setattr(time, "sleep", fake_sleep) + + headers = { + "x-ratelimit-reset-requests": "2s", + } + + @retry_with_backoff( + settings=RetryDecoratorSettings(max_retries=1, retry_base_delay=0.01), + rate_limit_exceptions=(RateLimitError,), + ) + def fn(): + calls["n"] += 1 + if calls["n"] == 1: + raise RateLimitError(headers=headers, status_code=429) + return "ok" + + assert fn() == "ok" + assert calls["n"] == 2 + assert any(x >= 2.0 for x in slept) From cdb544560be03bbe6903eb9eba70fe19fb5da8f7 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Tue, 2 Sep 2025 10:20:54 +0200 Subject: [PATCH 04/15] feat: add retry decorator configuration and update deployment templates --- infrastructure/rag/templates/_helpers.tpl | 4 + .../templates/admin-backend/deployment.yaml | 2 + .../rag/templates/backend/deployment.yaml | 2 + infrastructure/rag/templates/configmap.yaml | 9 ++ infrastructure/rag/values.yaml | 8 + .../impl/settings/retry_decorator_settings.py | 2 - .../tests/retry_decorator_test.py | 151 ++++++++---------- 7 files changed, 89 insertions(+), 89 deletions(-) diff --git a/infrastructure/rag/templates/_helpers.tpl b/infrastructure/rag/templates/_helpers.tpl index a7bd1acb..724bc05b 100644 --- a/infrastructure/rag/templates/_helpers.tpl +++ b/infrastructure/rag/templates/_helpers.tpl @@ -10,6 +10,10 @@ {{- printf "%s-usecase-configmap" .Release.Name | trunc 63 | trimSuffix "-" -}} {{- end -}} +{{- define "configmap.retryDecoratorName" -}} +{{- printf "%s-retry-decorator-configmap" .Release.Name | trunc 63 | trimSuffix "-" -}} +{{- end -}} + {{- define "secret.usecaseName" -}} {{- printf "%s-usecase-secret" .Release.Name | trunc 63 | trimSuffix "-" -}} {{- end -}} diff --git a/infrastructure/rag/templates/admin-backend/deployment.yaml b/infrastructure/rag/templates/admin-backend/deployment.yaml index 7dffb754..53797b8c 100644 --- a/infrastructure/rag/templates/admin-backend/deployment.yaml +++ b/infrastructure/rag/templates/admin-backend/deployment.yaml @@ -106,6 +106,8 @@ spec: name: {{ template "configmap.keyValueStoreName" . }} - configMapRef: name: {{ template "configmap.sourceUploaderName" . }} + - configMapRef: + name: {{ template "configmap.retryDecoratorName" . }} - secretRef: name: {{ template "secret.langfuseName" . }} - secretRef: diff --git a/infrastructure/rag/templates/backend/deployment.yaml b/infrastructure/rag/templates/backend/deployment.yaml index b0ddc827..c3dfbf44 100644 --- a/infrastructure/rag/templates/backend/deployment.yaml +++ b/infrastructure/rag/templates/backend/deployment.yaml @@ -131,6 +131,8 @@ spec: name: {{ template "configmap.fakeEmbedderName" . }} - configMapRef: name: {{ template "configmap.chatHistoryName" . }} + - configMapRef: + name: {{ template "configmap.retryDecoratorName" . }} - secretRef: name: {{ template "secret.langfuseName" . }} - secretRef: diff --git a/infrastructure/rag/templates/configmap.yaml b/infrastructure/rag/templates/configmap.yaml index b93f5029..46fde598 100644 --- a/infrastructure/rag/templates/configmap.yaml +++ b/infrastructure/rag/templates/configmap.yaml @@ -24,3 +24,12 @@ data: {{- range $key, $value := .Values.shared.envs.usecase }} {{ $key }}: {{ $value | quote }} {{- end }} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ template "configmap.retryDecoratorName" . }} +data: + {{- range $key, $value := .Values.shared.envs.retryDecorator }} + {{ $key }}: {{ $value | quote }} + {{- end }} diff --git a/infrastructure/rag/values.yaml b/infrastructure/rag/values.yaml index ac0e0160..fe9b81f9 100644 --- a/infrastructure/rag/values.yaml +++ b/infrastructure/rag/values.yaml @@ -441,6 +441,14 @@ shared: s3: S3_ENDPOINT: http://rag-minio:9000 S3_BUCKET: documents + retryDecorator: + RETRY_DECORATOR_MAX_RETRIES: "5" + RETRY_DECORATOR_RETRY_BASE_DELAY: "0.5" + RETRY_DECORATOR_RETRY_MAX_DELAY: "600" + RETRY_DECORATOR_BACKOFF_FACTOR: "2" + RETRY_DECORATOR_ATTEMPT_CAP: "6" + RETRY_DECORATOR_JITTER_MIN: "0.05" + RETRY_DECORATOR_JITTER_MAX: "0.25" usecase: diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py b/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py index 0338ff06..b03ed556 100644 --- a/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py +++ b/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py @@ -26,10 +26,8 @@ class RetryDecoratorSettings(BaseSettings): Maximum jitter to add to wait times. """ - # Pydantic v2 settings configuration model_config = SettingsConfigDict(env_prefix="RETRY_DECORATOR_", case_sensitive=False) - # Constrained fields max_retries: PositiveInt = Field( default=5, title="Max Retries", diff --git a/libs/rag-core-lib/tests/retry_decorator_test.py b/libs/rag-core-lib/tests/retry_decorator_test.py index d8745def..9a039d0f 100644 --- a/libs/rag-core-lib/tests/retry_decorator_test.py +++ b/libs/rag-core-lib/tests/retry_decorator_test.py @@ -1,5 +1,4 @@ import asyncio -import logging import time from typing import Optional @@ -19,119 +18,109 @@ def __init__(self, headers: Optional[dict[str, str]] = None, status_code: Option super().__init__("rate limit") -@pytest.mark.asyncio -async def test_async_success_first_try(): - calls = {"n": 0} +@pytest.fixture +def counter(): + class C: + def __init__(self) -> None: + self.n = 0 - @retry_with_backoff(settings=RetryDecoratorSettings(max_retries=2)) - async def fn(): - calls["n"] += 1 - return 42 + def inc(self) -> int: + self.n += 1 + return self.n - assert await fn() == 42 - assert calls["n"] == 1 + return C() -@pytest.mark.asyncio -async def test_async_retries_then_success(monkeypatch): - calls = {"n": 0} - slept = [] +@pytest.fixture +def async_sleeps(monkeypatch): + sleeps: list[float] = [] async def fake_sleep(x): - slept.append(x) + sleeps.append(x) monkeypatch.setattr(asyncio, "sleep", fake_sleep) + return sleeps + + +@pytest.fixture +def sync_sleeps(monkeypatch): + sleeps: list[float] = [] + + def fake_sleep(x): + sleeps.append(x) + + monkeypatch.setattr(time, "sleep", fake_sleep) + return sleeps + +@pytest.mark.asyncio +async def test_async_success_first_try(counter): + @retry_with_backoff(settings=RetryDecoratorSettings(max_retries=2)) + async def fn(): + counter.inc() + return 42 + + assert await fn() == 42 + assert counter.n == 1 + + +@pytest.mark.asyncio +async def test_async_retries_then_success(counter, async_sleeps): @retry_with_backoff(settings=RetryDecoratorSettings(max_retries=3, retry_base_delay=0.01)) async def fn(): - calls["n"] += 1 - if calls["n"] < 3: + if counter.inc() < 3: raise DummyError("boom") return "ok" assert await fn() == "ok" - assert calls["n"] == 3 - # Expect at least two sleeps due to two failures - assert len(slept) >= 2 + assert counter.n == 3 + assert len(async_sleeps) >= 2 # two failures -> two sleeps -def test_sync_success_first_try(): - calls = {"n": 0} - +def test_sync_success_first_try(counter): @retry_with_backoff(settings=RetryDecoratorSettings(max_retries=2)) def fn(): - calls["n"] += 1 + counter.inc() return 7 assert fn() == 7 - assert calls["n"] == 1 - - -def test_sync_retries_then_fail(monkeypatch): - calls = {"n": 0} - slept = [] + assert counter.n == 1 - def fake_sleep(x): - slept.append(x) - - monkeypatch.setattr(time, "sleep", fake_sleep) +def test_sync_retries_then_fail(counter, sync_sleeps): @retry_with_backoff(settings=RetryDecoratorSettings(max_retries=2, retry_base_delay=0.01)) def fn(): - calls["n"] += 1 + counter.inc() raise DummyError("always") with pytest.raises(DummyError): fn() - # 1 initial + 2 retries = 3 calls total - assert calls["n"] == 3 - # Two sleeps (after two failures) - assert len(slept) == 2 + assert counter.n == 3 # 1 initial + 2 retries + assert len(sync_sleeps) == 2 @pytest.mark.asyncio -async def test_async_rate_limit_uses_header_wait(monkeypatch): - calls = {"n": 0} - slept = [] - - async def fake_sleep(x): - slept.append(x) - - monkeypatch.setattr(asyncio, "sleep", fake_sleep) - - headers = { - "x-ratelimit-reset-requests": "1.5s", - "x-ratelimit-remaining-requests": "0", - } +async def test_async_rate_limit_uses_header_wait(counter, async_sleeps): + headers = {"x-ratelimit-reset-requests": "1.5s", "x-ratelimit-remaining-requests": "0"} @retry_with_backoff( settings=RetryDecoratorSettings(max_retries=1, retry_base_delay=0.01), rate_limit_exceptions=(RateLimitError,), ) async def fn(): - calls["n"] += 1 - if calls["n"] == 1: + if counter.inc() == 1: raise RateLimitError(headers=headers, status_code=429) return "ok" out = await fn() assert out == "ok" - assert calls["n"] == 2 - # Should sleep roughly ~1.5s (+jitter). Verify >= 1.5 - assert any(x >= 1.5 for x in slept) + assert counter.n == 2 + assert any(x >= 1.5 for x in async_sleeps) @pytest.mark.asyncio -async def test_is_rate_limited_callback(monkeypatch): - calls = {"n": 0} - slept = [] - - async def fake_sleep(x): - slept.append(x) - - monkeypatch.setattr(asyncio, "sleep", fake_sleep) - - def mark_rate_limited(exc: BaseException) -> bool: +async def test_is_rate_limited_callback(counter, async_sleeps): + def mark_rate_limited(exc: BaseException) -> bool: # noqa: ANN001 - explicit for clarity return isinstance(exc, DummyError) @retry_with_backoff( @@ -139,40 +128,28 @@ def mark_rate_limited(exc: BaseException) -> bool: is_rate_limited=mark_rate_limited, ) async def fn(): - calls["n"] += 1 - if calls["n"] == 1: + if counter.inc() == 1: raise DummyError("treated as rate limited") return "ok" out = await fn() assert out == "ok" - assert calls["n"] == 2 - assert len(slept) == 1 + assert counter.n == 2 + assert len(async_sleeps) == 1 -def test_sync_rate_limit_headers(monkeypatch): - calls = {"n": 0} - slept = [] - - def fake_sleep(x): - slept.append(x) - - monkeypatch.setattr(time, "sleep", fake_sleep) - - headers = { - "x-ratelimit-reset-requests": "2s", - } +def test_sync_rate_limit_headers(counter, sync_sleeps): + headers = {"x-ratelimit-reset-requests": "2s"} @retry_with_backoff( settings=RetryDecoratorSettings(max_retries=1, retry_base_delay=0.01), rate_limit_exceptions=(RateLimitError,), ) def fn(): - calls["n"] += 1 - if calls["n"] == 1: + if counter.inc() == 1: raise RateLimitError(headers=headers, status_code=429) return "ok" assert fn() == "ok" - assert calls["n"] == 2 - assert any(x >= 2.0 for x in slept) + assert counter.n == 2 + assert any(x >= 2.0 for x in sync_sleeps) From 72e2480cf72106dfcba305472e7a81d71f3f79d7 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Tue, 2 Sep 2025 10:58:52 +0200 Subject: [PATCH 05/15] docs: add documentation for retry decorator with exponential backoff and configuration details --- libs/README.md | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/libs/README.md b/libs/README.md index 53f7f2b5..e06214c1 100644 --- a/libs/README.md +++ b/libs/README.md @@ -18,6 +18,7 @@ It consists of the following python packages: - [3.3 Replaceable parts](#33-replaceable-parts) - [`4. RAG Core lib`](#4-rag-core-lib) - [4.1 Requirements](#41-requirements) + - [4.2 Retry decorator (exponential backoff)](#42-retry-decorator-exponential-backoff) With the exception of the `RAG Core lib` all of these packages contain an API definition and are easy to adjust for your specific use case. Each of the packages defines the replaceable parts([1.3 Replaceable Parts](#13-replaceable-parts), [2.3 Replaceable Parts](#23-replaceable-parts), [3.3 Replaceable Parts](#33-replaceable-parts)), expected types and offer a brief description. @@ -262,3 +263,53 @@ In addition to python libraries the following system packages are required: build-essential make ``` + +### 4.2 Retry decorator (exponential backoff) + +The `rag-core-lib` provides a reusable retry decorator with exponential backoff and rate‑limit awareness for both sync and async functions. + +- Module: `rag_core_lib.impl.utils.retry_decorator.retry_with_backoff` +- Settings: `rag_core_lib.impl.settings.retry_decorator_settings.RetryDecoratorSettings` +- Works with: synchronous and asynchronous callables +- Rate-limit aware: optionally inspects HTTP status 429 and headers like `x-ratelimit-reset-requests` / `x-ratelimit-reset-tokens` + +Usage example + +```python +from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings + +# Configure via code (env vars also supported, see below) +settings = RetryDecoratorSettings( + max_retries=3, + retry_base_delay=0.2, +) + +@retry_with_backoff(settings=settings) +def fetch_something(): + return "ok" + +@retry_with_backoff(settings=settings) +async def fetch_async_something(): + return "ok" +``` + +Configuration + +- Environment variables (prefix `RETRY_DECORATOR_`): + - `RETRY_DECORATOR_MAX_RETRIES` (default: 5) + - `RETRY_DECORATOR_RETRY_BASE_DELAY` (default: 0.5) + - `RETRY_DECORATOR_RETRY_MAX_DELAY` (default: 600) + - `RETRY_DECORATOR_BACKOFF_FACTOR` (default: 2) + - `RETRY_DECORATOR_ATTEMPT_CAP` (default: 6) + - `RETRY_DECORATOR_JITTER_MIN` (default: 0.05) + - `RETRY_DECORATOR_JITTER_MAX` (default: 0.25) + +- Helm chart (shared values): set the same keys under `shared.envs.retryDecorator` in [infrastructure/rag/values.yaml](../infrastructure/rag/values.yaml) to apply cluster‑wide defaults for backend/admin services. + +Advanced + +- Customize which exceptions trigger retries via `exceptions` and `rate_limit_exceptions` parameters of `retry_with_backoff()`. +- Header‑based wait: When rate‑limited, the decorator will honor reset headers if present and add jitter. + +For more examples, see tests in [./rag-core-lib/tests/retry_decorator_test.py](./rag-core-lib/tests/retry_decorator_test.py). From bde7a8ced735606c0fc4017059a6ad93291179c3 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Tue, 2 Sep 2025 11:07:50 +0200 Subject: [PATCH 06/15] feat: add pytest-asyncio dependency for improved async testing support --- libs/rag-core-lib/poetry.lock | 21 ++++++++++++++++++++- libs/rag-core-lib/pyproject.toml | 1 + 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/libs/rag-core-lib/poetry.lock b/libs/rag-core-lib/poetry.lock index ce06b496..d711a98d 100644 --- a/libs/rag-core-lib/poetry.lock +++ b/libs/rag-core-lib/poetry.lock @@ -2913,6 +2913,25 @@ pygments = ">=2.7.2" [package.extras] dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "requests", "setuptools", "xmlschema"] +[[package]] +name = "pytest-asyncio" +version = "0.26.0" +description = "Pytest support for asyncio" +optional = false +python-versions = ">=3.9" +groups = ["dev"] +files = [ + {file = "pytest_asyncio-0.26.0-py3-none-any.whl", hash = "sha256:7b51ed894f4fbea1340262bdae5135797ebbe21d8638978e35d31c6d19f72fb0"}, + {file = "pytest_asyncio-0.26.0.tar.gz", hash = "sha256:c4df2a697648241ff39e7f0e4a73050b03f123f760673956cf0d72a4990e312f"}, +] + +[package.dependencies] +pytest = ">=8.2,<9" + +[package.extras] +docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1)"] +testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"] + [[package]] name = "python-dotenv" version = "1.1.1" @@ -3856,4 +3875,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = "^3.13" -content-hash = "154d1d942e5fc55e3e24dbc03a58ca56dbe78f2c361024b72abb46af1353f1ec" +content-hash = "a48de2378937cd3543b4ee3616b9191f45414aa6637e14e8fdeb2b05d962b54b" diff --git a/libs/rag-core-lib/pyproject.toml b/libs/rag-core-lib/pyproject.toml index 086849f9..478f7dcc 100644 --- a/libs/rag-core-lib/pyproject.toml +++ b/libs/rag-core-lib/pyproject.toml @@ -28,6 +28,7 @@ langchain-openai = "^0.3.27" [tool.poetry.group.dev.dependencies] debugpy = "^1.8.14" pytest = "^8.3.5" +pytest-asyncio = "^0.26.0" coverage = "^7.5.4" flake8 = "^7.2.0" flake8-black = "^0.3.6" From 657715ce34708452c68bb7af4a0c7e0278c928e2 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Wed, 24 Sep 2025 12:08:35 +0200 Subject: [PATCH 07/15] docs: Update libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../src/rag_core_lib/impl/settings/retry_decorator_settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py b/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py index b03ed556..b306af1b 100644 --- a/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py +++ b/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py @@ -1,4 +1,4 @@ -"""Module contains settings regarding the STACKIT vLLM.""" +"""Module contains settings for the retry decorator.""" from pydantic import Field, PositiveInt, model_validator from pydantic_settings import BaseSettings, SettingsConfigDict From f94ab4e27f3b4328de47ee4e6c56f3f3c9f8f9c0 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Wed, 24 Sep 2025 12:09:01 +0200 Subject: [PATCH 08/15] chore: Update libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py index 8be5bb0b..8c4e94b7 100644 --- a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py @@ -98,7 +98,7 @@ def _with_jitter(self, seconds: float) -> float: self.cfg.retry_max_delay, ) - def _calculate_wait_time(self, attempt: int, exc: BaseException) -> float | None: + def _calculate_wait_time(self, attempt: int, exc: BaseException) -> Optional[float]: """Return wait seconds or None to re-raise.""" total_attempts = self.cfg.max_retries + 1 if attempt == self.cfg.max_retries: From 8acf200e9a97ec9e41943e5a823aaba4e0cde75f Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Thu, 9 Oct 2025 09:45:12 +0200 Subject: [PATCH 09/15] refactor: equip the langchain summarizer with a retry decorator. (#90) This pull request introduces enhanced configurability and reliability to the summarization workflow by adding granular retry and concurrency settings, and refactoring the summarizer to use them. The changes allow for more robust handling of transient failures and better control over resource usage. **Configuration enhancements:** * Added new retry-related fields (e.g., `max_retries`, `retry_base_delay`, `retry_max_delay`, `backoff_factor`, `attempt_cap`, `jitter_min`, `jitter_max`) to the `SummarizerSettings` class, allowing fine-grained control over retry behavior for summarization tasks. [[1]](diffhunk://#diff-ceade27a403894bb34e6c3c94bca8739203875d1037ce8896348b9eeb377dbcbL15-R31) [[2]](diffhunk://#diff-ceade27a403894bb34e6c3c94bca8739203875d1037ce8896348b9eeb377dbcbL26-R82) * Fixed a typo in the `SummarizerSettings` field name from `maximum_concurrreny` to `maximum_concurrency`. [[1]](diffhunk://#diff-ceade27a403894bb34e6c3c94bca8739203875d1037ce8896348b9eeb377dbcbL15-R31) [[2]](diffhunk://#diff-ceade27a403894bb34e6c3c94bca8739203875d1037ce8896348b9eeb377dbcbL26-R82) **Dependency injection and wiring:** * Registered `RetryDecoratorSettings` in the dependency container and passed both summarizer and global retry settings to the `LangchainSummarizer` instance, enabling summarizer-specific overrides. [[1]](diffhunk://#diff-8b7c1816cb3e0a40b7965721c550eefdc184c5d914ec023e36527255613381e7R67) [[2]](diffhunk://#diff-8b7c1816cb3e0a40b7965721c550eefdc184c5d914ec023e36527255613381e7R90) [[3]](diffhunk://#diff-8b7c1816cb3e0a40b7965721c550eefdc184c5d914ec023e36527255613381e7L139-R143) **Summarizer logic refactoring:** * Refactored the summarization logic in `LangchainSummarizer` to: - Use asynchronous chunk summarization with concurrency control via a semaphore. - Implement retry logic with exponential backoff and jitter for chunk summarization, using the new settings for configuration. - Cleaned up error handling and removed redundant retry code in favor of the new decorator-based approach. [[1]](diffhunk://#diff-9793b1081628436dd7d5a0e37abc9d79ee5e25af3f5e784f99379249809ed8dbR3-R21) [[2]](diffhunk://#diff-9793b1081628436dd7d5a0e37abc9d79ee5e25af3f5e784f99379249809ed8dbR39-R47) [[3]](diffhunk://#diff-9793b1081628436dd7d5a0e37abc9d79ee5e25af3f5e784f99379249809ed8dbL68-R161) These changes collectively improve the reliability, configurability, and maintainability of the summarization pipeline. **fixes partly following issue** https://github.com/stackitcloud/rag-template/issues/87 --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- infrastructure/rag/values.yaml | 7 ++ libs/README.md | 38 ++++++++- .../src/admin_api_lib/dependency_container.py | 6 +- .../impl/settings/summarizer_settings.py | 62 +++++++++++++- .../impl/summarizer/langchain_summarizer.py | 82 +++++++++++++------ 5 files changed, 163 insertions(+), 32 deletions(-) diff --git a/infrastructure/rag/values.yaml b/infrastructure/rag/values.yaml index 6f6f4ee7..74c54ddc 100644 --- a/infrastructure/rag/values.yaml +++ b/infrastructure/rag/values.yaml @@ -319,6 +319,13 @@ adminBackend: summarizer: SUMMARIZER_MAXIMUM_INPUT_SIZE: "8000" SUMMARIZER_MAXIMUM_CONCURRENCY: "10" + SUMMARIZER_MAX_RETRIES: "5" + SUMMARIZER_RETRY_BASE_DELAY: "0.5" + SUMMARIZER_RETRY_MAX_DELAY: "600" + SUMMARIZER_BACKOFF_FACTOR: "2" + SUMMARIZER_ATTEMPT_CAP: "6" + SUMMARIZER_JITTER_MIN: "0.05" + SUMMARIZER_JITTER_MAX: "0.25" ragapi: RAG_API_HOST: "http://backend:8080" chunker: diff --git a/libs/README.md b/libs/README.md index e06214c1..f2b82e13 100644 --- a/libs/README.md +++ b/libs/README.md @@ -12,6 +12,7 @@ It consists of the following python packages: - [2.1 Requirements](#21-requirements) - [2.2 Endpoints](#22-endpoints) - [2.3 Replaceable parts](#23-replaceable-parts) + - [2.4 Summarizer retry behavior](#24-summarizer-retry-behavior) - [`3. Extractor API lib`](#3-extractor-api-lib) - [3.1 Requirements](#31-requirements) - [3.2 Endpoints](#32-endpoints) @@ -71,6 +72,7 @@ By default `OpenAI` is used by the evaluation. If you want to use the same LLM-c Endpoint to remove documents from the vector database. #### `/information_pieces/upload` + Endpoint to upload documents into the vector database. These documents need to have been parsed. For simplicity, a LangChain Documents like format is used. Uploaded documents are required to contain the following metadata: @@ -94,7 +96,7 @@ Uploaded documents are required to contain the following metadata: | chat_graph | [`rag_core_api.graph.graph_base.GraphBase`](./rag-core-api/src/rag_core_api/graph/graph_base.py) | [`rag_core_api.impl.graph.chat_graph.DefaultChatGraph`](./rag-core-api/src/rag_core_api/impl/graph/chat_graph.py) | Langgraph graph that contains the entire logic for question answering. | | traced_chat_graph | [`rag_core_lib.chains.async_chain.AsyncChain[Any, Any]`](./rag-core-lib/src/rag_core_lib/chains/async_chain.py)| [`rag_core_lib.impl.tracers.langfuse_traced_chain.LangfuseTracedGraph`](./rag-core-lib/src/rag_core_lib/impl/tracers/langfuse_traced_chain.py) | Wraps around the *chat_graph* and add langfuse tracing. | | evaluator | [`rag_core_api.impl.evaluator.langfuse_ragas_evaluator.LangfuseRagasEvaluator`](./rag-core-api/src/rag_core_api/impl/evaluator/langfuse_ragas_evaluator.py) | [`rag_core_api.impl.evaluator.langfuse_ragas_evaluator.LangfuseRagasEvaluator`](./rag-core-api/src/rag_core_api/impl/evaluator/langfuse_ragas_evaluator.py) | The evaulator used in the evaluate endpoint. | -| chat_endpoint | [ `rag_core_api.api_endpoints.chat.Chat`](./rag-core-api/src/rag_core_api/api_endpoints/chat.py) | [`rag_core_api.impl.api_endpoints.default_chat.DefaultChat`](./rag-core-api/src/rag_core_api/impl/api_endpoints/default_chat.py) | Implementation of the chat endpoint. Default implementation just calls the *traced_chat_graph* | +| chat_endpoint | [`rag_core_api.api_endpoints.chat.Chat`](./rag-core-api/src/rag_core_api/api_endpoints/chat.py) | [`rag_core_api.impl.api_endpoints.default_chat.DefaultChat`](./rag-core-api/src/rag_core_api/impl/api_endpoints/default_chat.py) | Implementation of the chat endpoint. Default implementation just calls the *traced_chat_graph* | | ragas_llm | `langchain_core.language_models.chat_models.BaseChatModel` | `langchain_openai.ChatOpenAI` or `langchain_ollama.ChatOllama` | The LLM used for the ragas evaluation. | ## 2. Admin API Lib @@ -115,7 +117,7 @@ The following endpoints are provided by the *admin-api-lib*: All required python libraries can be found in the [pyproject.toml](./admin-api-lib/pyproject.toml) file. In addition to python libraries, the following system packages are required: -``` +```shell build-essential make ``` @@ -157,10 +159,10 @@ The extracted information will be summarized using LLM. The summary, as well as | key_value_store | [`admin_api_lib.impl.key_db.file_status_key_value_store.FileStatusKeyValueStore`](./admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py) | [`admin_api_lib.impl.key_db.file_status_key_value_store.FileStatusKeyValueStore`](./admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py) | Is used for storing the available sources and their current state. | | chunker | [`admin_api_lib.chunker.chunker.Chunker`](./admin-api-lib/src/admin_api_lib/chunker/chunker.py) | [`admin_api_lib.impl.chunker.text_chunker.TextChunker`](./admin-api-lib/src/admin_api_lib/impl/chunker/text_chunker.py) | Used for splitting the documents in managable chunks. | | document_extractor | [`admin_api_lib.extractor_api_client.openapi_client.api.extractor_api.ExtractorApi`](./admin-api-lib/src/admin_api_lib/extractor_api_client/openapi_client/api/extractor_api.py) | [`admin_api_lib.extractor_api_client.openapi_client.api.extractor_api.ExtractorApi`](./admin-api-lib/src/admin_api_lib/extractor_api_client/openapi_client/api/extractor_api.py) | Needs to be replaced if adjustments to the `extractor-api` is made. | -| rag_api | [`admin_api_lib.rag_backend_client.openapi_client.api.rag_api.RagApi`](./admin-api-lib/src/admin_api_lib/rag_backend_client/openapi_client/api/rag_api.py) | [`admin_api_lib.rag_backend_client.openapi_client.api.rag_api.RagApi`](./admin-api-lib/src/admin_api_lib/rag_backend_client/openapi_client/api/rag_api.py) | Needs to be replaced if changes to the `/information_pieces/remove` or `/information_pieces/upload` of the [`rag-core-api`](#rag-core-api) are made. | +| rag_api | [`admin_api_lib.rag_backend_client.openapi_client.api.rag_api.RagApi`](./admin-api-lib/src/admin_api_lib/rag_backend_client/openapi_client/api/rag_api.py) | [`admin_api_lib.rag_backend_client.openapi_client.api.rag_api.RagApi`](./admin-api-lib/src/admin_api_lib/rag_backend_client/openapi_client/api/rag_api.py) | Needs to be replaced if changes to the `/information_pieces/remove` or `/information_pieces/upload` of the [`rag-core-api`](#1-rag-core-api) are made. | | summarizer_prompt | `str` | [`admin_api_lib.prompt_templates.summarize_prompt.SUMMARIZE_PROMPT`](./admin-api-lib/src/admin_api_lib/prompt_templates/summarize_prompt.py) | The prompt used of the summarization. | | langfuse_manager | [`rag_core_lib.impl.langfuse_manager.langfuse_manager.LangfuseManager`](./rag-core-lib/src/rag_core_lib/impl/langfuse_manager/langfuse_manager.py) | [`rag_core_lib.impl.langfuse_manager.langfuse_manager.LangfuseManager`](./rag-core-lib/src/rag_core_lib/impl/langfuse_manager/langfuse_manager.py) | Retrieves additional settings, as well as the prompt from langfuse if available. | -| summarizer | [`admin_api_lib.summarizer.summarizer.Summarizer`](./admin-api-lib/src/admin_api_lib/summarizer/summarizer.py) | [`admin_api_lib.impl.summarizer.langchain_summarizer.LangchainSummarizer`](./admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py) | Creates the summaries. | +| summarizer | [`admin_api_lib.summarizer.summarizer.Summarizer`](./admin-api-lib/src/admin_api_lib/summarizer/summarizer.py) | [`admin_api_lib.impl.summarizer.langchain_summarizer.LangchainSummarizer`](./admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py) | Creates the summaries. Uses the shared retry decorator with optional per-summarizer overrides (see 2.4). | | untraced_information_enhancer |[`admin_api_lib.information_enhancer.information_enhancer.InformationEnhancer`](./admin-api-lib/src/admin_api_lib/information_enhancer/information_enhancer.py) | [`admin_api_lib.impl.information_enhancer.general_enhancer.GeneralEnhancer`](./admin-api-lib/src/admin_api_lib/impl/information_enhancer/general_enhancer.py) | Uses the *summarizer* to enhance the extracted documents. | | information_enhancer | [`rag_core_lib.chains.async_chain.AsyncChain[Any, Any]`](./rag-core-lib/src/rag_core_lib/chains/async_chain.py)| [`rag_core_lib.impl.tracers.langfuse_traced_chain.LangfuseTracedGraph`](./rag-core-lib/src/rag_core_lib/impl/tracers/langfuse_traced_chain.py) |Wraps around the *untraced_information_enhancer* and adds langfuse tracing. | | document_deleter |[`admin_api_lib.api_endpoints.document_deleter.DocumentDeleter`](./admin-api-lib/src/admin_api_lib/api_endpoints/document_deleter.py) | [`admin_api_lib.impl.api_endpoints.default_document_deleter.DefaultDocumentDeleter`](./admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_document_deleter.py) | Handles deletion of sources. | @@ -169,6 +171,32 @@ The extracted information will be summarized using LLM. The summary, as well as | document_reference_retriever | [`admin_api_lib.api_endpoints.document_reference_retriever.DocumentReferenceRetriever`](./admin-api-lib/src/admin_api_lib/api_endpoints/document_reference_retriever.py) | [`admin_api_lib.impl.api_endpoints.default_document_reference_retriever.DefaultDocumentReferenceRetriever`](./admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_document_reference_retriever.py) | Handles return of files from connected storage. | | file_uploader | [`admin_api_lib.api_endpoints.file_uploader.FileUploader`](./admin-api-lib/src/admin_api_lib/api_endpoints/file_uploader.py) | [`admin_api_lib.impl.api_endpoints.default_file_uploader.DefaultFileUploader`](./admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_file_uploader.py) | Handles upload and extraction of files. | +### 2.4 Summarizer retry behavior + +The default summarizer implementation (`LangchainSummarizer`) now uses the shared retry decorator with exponential backoff from the `rag-core-lib`. + +- Decorator: `rag_core_lib.impl.utils.retry_decorator.retry_with_backoff` +- Base settings (fallback): [`RetryDecoratorSettings`](./rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py) +- Per-summarizer overrides: [`SummarizerSettings`](./admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py) + +How it resolves settings + +- Each field in `SummarizerSettings` is optional. When a field is provided (not None), it overrides the corresponding value from `RetryDecoratorSettings`. +- When a field is not provided (None), the summarizer falls back to the value from `RetryDecoratorSettings`. + +Configuring via environment variables + +- Summarizer-specific (prefix `SUMMARIZER_`): + - `SUMMARIZER_MAX_RETRIES` + - `SUMMARIZER_RETRY_BASE_DELAY` + - `SUMMARIZER_RETRY_MAX_DELAY` + - `SUMMARIZER_BACKOFF_FACTOR` + - `SUMMARIZER_ATTEMPT_CAP` + - `SUMMARIZER_JITTER_MIN` + - `SUMMARIZER_JITTER_MAX` +- Global fallback (prefix `RETRY_DECORATOR_`): see section [4.2](#42-retry-decorator-exponential-backoff) for all keys and defaults. +- Helm chart: set the same keys under `adminBackend.envs.summarizer` in [infrastructure/rag/values.yaml](../infrastructure/rag/values.yaml). + ## 3. Extractor API Lib The Extractor Library contains components that provide document parsing capabilities for various file formats and web sources. It supports extracting content from PDF, DOCX, XML files, as well as web pages via sitemaps and Confluence pages. It also includes a default `dependency_container`, that is pre-configured and is a good starting point for most use-cases. This API should not be exposed by ingress and only used for internally. @@ -197,6 +225,7 @@ tesseract-ocr-eng ### 3.2 Endpoints #### `/extract_from_file` + This endpoint will extract the information from PDF,PTTX,WORD,XML files. It will load the files from the connected storage. The following types of information will be extracted: @@ -215,6 +244,7 @@ The following types of information can be extracted: - `IMAGE`: image found in the document For sitemap sources, additional parameters can be provided, e.g.: + - `web_path`: The URL of the XML sitemap to crawl - `filter_urls`: JSON array of URL patterns to filter pages (optional) - `header_template`: JSON object for custom HTTP headers (optional) diff --git a/libs/admin-api-lib/src/admin_api_lib/dependency_container.py b/libs/admin-api-lib/src/admin_api_lib/dependency_container.py index 798cda18..ee86cb5f 100644 --- a/libs/admin-api-lib/src/admin_api_lib/dependency_container.py +++ b/libs/admin-api-lib/src/admin_api_lib/dependency_container.py @@ -64,6 +64,7 @@ from rag_core_lib.impl.settings.langfuse_settings import LangfuseSettings from rag_core_lib.impl.settings.ollama_llm_settings import OllamaSettings from rag_core_lib.impl.settings.rag_class_types_settings import RAGClassTypeSettings +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.settings.stackit_vllm_settings import StackitVllmSettings from rag_core_lib.impl.tracers.langfuse_traced_runnable import LangfuseTracedRunnable from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore @@ -86,6 +87,7 @@ class DependencyContainer(DeclarativeContainer): key_value_store_settings = KeyValueSettings() summarizer_settings = SummarizerSettings() source_uploader_settings = SourceUploaderSettings() + retry_decorator_settings = RetryDecoratorSettings() key_value_store = Singleton(FileStatusKeyValueStore, key_value_store_settings) file_service = Singleton(S3Service, s3_settings=s3_settings) @@ -136,7 +138,9 @@ class DependencyContainer(DeclarativeContainer): LangchainSummarizer, langfuse_manager=langfuse_manager, chunker=summary_text_splitter, - semaphore=Singleton(AsyncThreadsafeSemaphore, summarizer_settings.maximum_concurrreny), + semaphore=Singleton(AsyncThreadsafeSemaphore, summarizer_settings.maximum_concurrency), + summarizer_settings=summarizer_settings, + retry_decorator_settings=retry_decorator_settings, ) summary_enhancer = List( diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py b/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py index 3617adb8..b3138ce0 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py @@ -1,6 +1,7 @@ """Contains settings for summarizer.""" -from pydantic import Field +from typing import Optional +from pydantic import Field, PositiveInt from pydantic_settings import BaseSettings @@ -12,8 +13,22 @@ class SummarizerSettings(BaseSettings): ---------- maximum_input_size : int The maximum size of the input that the summarizer can handle. Default is 8000. - maximum_concurrreny : int + maximum_concurrency : int The maximum number of concurrent summarization processes. Default is 10. + max_retries: Optional[PositiveInt] + Total retries, not counting the initial attempt. + retry_base_delay: Optional[float] + Base delay in seconds for the first retry. + retry_max_delay: Optional[float] + Maximum delay cap in seconds for any single wait. + backoff_factor: Optional[float] + Exponential backoff factor (>= 1). + attempt_cap: Optional[int] + Cap for exponent growth (backoff_factor ** attempt_cap). + jitter_min: Optional[float] + Minimum jitter in seconds. + jitter_max: Optional[float] + Maximum jitter in seconds. """ class Config: @@ -23,4 +38,45 @@ class Config: case_sensitive = False maximum_input_size: int = Field(default=8000) - maximum_concurrreny: int = Field(default=10) + maximum_concurrency: int = Field(default=10) + max_retries: Optional[PositiveInt] = Field( + default=None, + title="Max Retries", + description="Total retries, not counting the initial attempt.", + ) + retry_base_delay: Optional[float] = Field( + default=None, + ge=0, + title="Retry Base Delay", + description="Base delay in seconds for the first retry.", + ) + retry_max_delay: Optional[float] = Field( + default=None, + gt=0, + title="Retry Max Delay", + description="Maximum delay cap in seconds for any single wait.", + ) + backoff_factor: Optional[float] = Field( + default=None, + ge=1.0, + title="Backoff Factor", + description="Exponential backoff factor (>= 1).", + ) + attempt_cap: Optional[int] = Field( + default=None, + ge=0, + title="Attempt Cap", + description="Cap for exponent growth (backoff_factor ** attempt_cap).", + ) + jitter_min: Optional[float] = Field( + default=None, + ge=0.0, + title="Jitter Min (s)", + description="Minimum jitter in seconds.", + ) + jitter_max: Optional[float] = Field( + default=None, + ge=0.0, + title="Jitter Max (s)", + description="Maximum jitter in seconds.", + ) diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py index 1d5b5d09..0872ddaa 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py @@ -1,20 +1,24 @@ """Module for the LangchainSummarizer class.""" +import asyncio import logging -import traceback from typing import Optional from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.documents import Document from langchain_core.runnables import Runnable, RunnableConfig, ensure_config +from openai import APIConnectionError, APIError, APITimeoutError, RateLimitError +from admin_api_lib.impl.settings.summarizer_settings import SummarizerSettings from admin_api_lib.summarizer.summarizer import ( Summarizer, SummarizerInput, SummarizerOutput, ) from rag_core_lib.impl.langfuse_manager.langfuse_manager import LangfuseManager +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore +from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff logger = logging.getLogger(__name__) @@ -32,10 +36,15 @@ def __init__( langfuse_manager: LangfuseManager, chunker: RecursiveCharacterTextSplitter, semaphore: AsyncThreadsafeSemaphore, + summarizer_settings: SummarizerSettings, + retry_decorator_settings: RetryDecoratorSettings, ): self._chunker = chunker self._langfuse_manager = langfuse_manager self._semaphore = semaphore + self._retry_decorator_settings = self._create_retry_decorator_settings( + summarizer_settings, retry_decorator_settings + ) async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] = None) -> SummarizerOutput: """ @@ -65,40 +74,65 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] """ assert query, "Query is empty: %s" % query # noqa S101 config = ensure_config(config) - tries_remaining = config.get("configurable", {}).get("tries_remaining", 3) - logger.debug("Tries remaining %d" % tries_remaining) - if tries_remaining < 0: - raise Exception("Summary creation failed.") document = Document(page_content=query) langchain_documents = self._chunker.split_documents([document]) + logger.debug("Summarizing %d chunk(s)...", len(langchain_documents)) - outputs = [] - for langchain_document in langchain_documents: - async with self._semaphore: - try: - result = await self._create_chain().ainvoke({"text": langchain_document.page_content}, config) - # Extract content from AIMessage if it's not already a string - content = result.content if hasattr(result, "content") else str(result) - outputs.append(content) - except Exception as e: - logger.error("Error in summarizing langchain doc: %s %s", e, traceback.format_exc()) - config["tries_remaining"] = tries_remaining - 1 - result = await self._create_chain().ainvoke({"text": langchain_document.page_content}, config) - # Extract content from AIMessage if it's not already a string - content = result.content if hasattr(result, "content") else str(result) - outputs.append(content) + # Fan out with concurrency, bounded by your semaphore inside _summarize_chunk + tasks = [asyncio.create_task(self._summarize_chunk(doc.page_content, config)) for doc in langchain_documents] + outputs = await asyncio.gather(*tasks) if len(outputs) == 1: return outputs[0] - summary = " ".join(outputs) + + merged = " ".join(outputs) logger.debug( - "Reduced number of chars from %d to %d" - % (len("".join([x.page_content for x in langchain_documents])), len(summary)) + "Reduced number of chars from %d to %d", + len("".join([x.page_content for x in langchain_documents])), + len(merged), ) - return await self.ainvoke(summary, config) + return await self._summarize_chunk(merged, config) + + def _create_retry_decorator_settings( + self, summarizer_settings: SummarizerSettings, retry_decorator_settings: RetryDecoratorSettings + ): + fields = [ + "max_retries", + "retry_base_delay", + "retry_max_delay", + "backoff_factor", + "attempt_cap", + "jitter_min", + "jitter_max", + ] + settings_kwargs = { + field: getattr(summarizer_settings, field) + if getattr(summarizer_settings, field) is not None + else getattr(retry_decorator_settings, field) + for field in fields + } + return RetryDecoratorSettings(**settings_kwargs) def _create_chain(self) -> Runnable: return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm( self.__class__.__name__ ) + + def _retry_with_backoff_wrapper(self): + return retry_with_backoff( + settings=self._retry_decorator_settings, + exceptions=(APIError, RateLimitError, APITimeoutError, APIConnectionError), + rate_limit_exceptions=(RateLimitError,), + logger=logger, + ) + + async def _summarize_chunk(self, text: str, config: Optional[RunnableConfig]) -> SummarizerOutput: + @self._retry_with_backoff_wrapper() + async def _call(text: str, config: Optional[RunnableConfig]) -> SummarizerOutput: + response = await self._create_chain().ainvoke({"text": text}, config) + return response.content if hasattr(response, "content") else str(response) + + # Hold the semaphore for the entire retry lifecycle + async with self._semaphore: + return await _call(text, config) From 28665936b8e1c01749fed216817eff5460625194 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Thu, 9 Oct 2025 09:48:13 +0200 Subject: [PATCH 10/15] refactor: remove redundant validation checks in RetryDecoratorSettings --- .../rag_core_lib/impl/settings/retry_decorator_settings.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py b/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py index b306af1b..bc9a6137 100644 --- a/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py +++ b/libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py @@ -75,9 +75,4 @@ def _check_relations(self) -> "RetryDecoratorSettings": # Ensure jitter_max >= jitter_min if self.jitter_max < self.jitter_min: raise ValueError("jitter_max must be >= jitter_min") - # Ensure retry_max_delay is meaningful vs base - if self.retry_max_delay <= 0: - raise ValueError("retry_max_delay must be > 0") - if self.backoff_factor < 1: - raise ValueError("backoff_factor must be >= 1") return self From 210c2411735d24100d2df82033452e81c97eeac6 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Thu, 9 Oct 2025 10:07:07 +0200 Subject: [PATCH 11/15] refactor: equip the stackit embedder with a retry decorator. (#91) This pull request introduces configurable retry behavior for the `StackitEmbedder`, allowing for fine-grained control of retry and backoff parameters via environment variables, Helm chart values, or code. The changes ensure that retry settings can be overridden per embedder instance, falling back to shared defaults when not specified. Documentation and dependency injection are updated to reflect this new flexibility. **Embedder Retry Configuration** * Added new optional retry-related fields (`max_retries`, `retry_base_delay`, `retry_max_delay`, `backoff_factor`, `attempt_cap`, `jitter_min`, `jitter_max`) to the `StackitEmbedderSettings` model, allowing per-embedder overrides of retry/backoff parameters. [[1]](diffhunk://#diff-0e502aa8b53287c8b12f5f4d053e9ae904620403c6c502df29f9673f4ae88d09R21-R34) [[2]](diffhunk://#diff-0e502aa8b53287c8b12f5f4d053e9ae904620403c6c502df29f9673f4ae88d09R46-R86) * Updated the `StackitEmbedder` implementation to use a shared retry decorator with exponential backoff, resolving settings from both `StackitEmbedderSettings` and fallback `RetryDecoratorSettings`. The retry logic now handles OpenAI API errors and rate limits robustly. [[1]](diffhunk://#diff-7ebf8bf6adafb79699aea6bcd32de76398f9734e795f1d3c53cf524f1d69a5a1L4-R38) [[2]](diffhunk://#diff-7ebf8bf6adafb79699aea6bcd32de76398f9734e795f1d3c53cf524f1d69a5a1R63-R73) [[3]](diffhunk://#diff-7ebf8bf6adafb79699aea6bcd32de76398f9734e795f1d3c53cf524f1d69a5a1L72-R138) **Dependency Injection and Configuration** * Modified the dependency container to inject both `StackitEmbedderSettings` and `RetryDecoratorSettings` into the `StackitEmbedder`, supporting the new configuration pattern. [[1]](diffhunk://#diff-483b37f4ebbc24c973c3b170542171d90c65f3c6b68f1a6d598ce8964a94be7bR66) [[2]](diffhunk://#diff-483b37f4ebbc24c973c3b170542171d90c65f3c6b68f1a6d598ce8964a94be7bR93) [[3]](diffhunk://#diff-483b37f4ebbc24c973c3b170542171d90c65f3c6b68f1a6d598ce8964a94be7bL101-R103) * Added corresponding environment variable keys to the Helm chart (`values.yaml`), enabling retry configuration via deployment configuration for both backend and adminBackend services. [[1]](diffhunk://#diff-673dd2d3d4e66a8fd4e45f9c1c9900711313f946bf8b6a89e96c954988fc14f3R195-R202) [[2]](diffhunk://#diff-673dd2d3d4e66a8fd4e45f9c1c9900711313f946bf8b6a89e96c954988fc14f3R325) **Documentation Updates** * Documented the new retry configuration mechanism in `libs/README.md`, explaining how override and fallback resolution works, and how to configure via environment variables and Helm chart values. [[1]](diffhunk://#diff-34194a117b05d75d22ca968cdb7d540839dc7a0eb33960fbca668b5a6ade87cbR11) [[2]](diffhunk://#diff-34194a117b05d75d22ca968cdb7d540839dc7a0eb33960fbca668b5a6ade87cbR103-R128) **tackles following issue:** https://github.com/stackitcloud/rag-template/issues/87 --- infrastructure/rag/values.yaml | 9 +++ libs/README.md | 27 +++++++++ .../impl/summarizer/langchain_summarizer.py | 34 ++++------- .../src/rag_core_api/dependency_container.py | 4 +- .../impl/embeddings/stackit_embedder.py | 45 +++++++++++--- .../settings/stackit_embedder_settings.py | 58 ++++++++++++++++++- .../impl/utils/retry_decorator.py | 38 ++++++++++++ 7 files changed, 183 insertions(+), 32 deletions(-) diff --git a/infrastructure/rag/values.yaml b/infrastructure/rag/values.yaml index 74c54ddc..e1294614 100644 --- a/infrastructure/rag/values.yaml +++ b/infrastructure/rag/values.yaml @@ -197,6 +197,14 @@ backend: stackitEmbedder: STACKIT_EMBEDDER_MODEL: "intfloat/e5-mistral-7b-instruct" STACKIT_EMBEDDER_BASE_URL: https://api.openai-compat.model-serving.eu01.onstackit.cloud/v1 + # Retry settings (optional). If omitted, fall back to shared RETRY_DECORATOR_* values. + STACKIT_EMBEDDER_MAX_RETRIES: "5" + STACKIT_EMBEDDER_RETRY_BASE_DELAY: "0.5" + STACKIT_EMBEDDER_RETRY_MAX_DELAY: "600" + STACKIT_EMBEDDER_BACKOFF_FACTOR: "2" + STACKIT_EMBEDDER_ATTEMPT_CAP: "6" + STACKIT_EMBEDDER_JITTER_MIN: "0.05" + STACKIT_EMBEDDER_JITTER_MAX: "0.25" ollama: OLLAMA_MODEL: "llama3.2:3b-instruct-fp16" OLLAMA_BASE_URL: "http://rag-ollama:11434" @@ -319,6 +327,7 @@ adminBackend: summarizer: SUMMARIZER_MAXIMUM_INPUT_SIZE: "8000" SUMMARIZER_MAXIMUM_CONCURRENCY: "10" + # Retry settings (optional). If omitted, fall back to shared RETRY_DECORATOR_* values. SUMMARIZER_MAX_RETRIES: "5" SUMMARIZER_RETRY_BASE_DELAY: "0.5" SUMMARIZER_RETRY_MAX_DELAY: "600" diff --git a/libs/README.md b/libs/README.md index f2b82e13..960e7cfb 100644 --- a/libs/README.md +++ b/libs/README.md @@ -8,6 +8,7 @@ It consists of the following python packages: - [1.1 Requirements](#11-requirements) - [1.2 Endpoints](#12-endpoints) - [1.3 Replaceable parts](#13-replaceable-parts) + - [1.4 Embedder retry behavior](#14-embedder-retry-behavior) - [`2. Admin API lib`](#2-admin-api-lib) - [2.1 Requirements](#21-requirements) - [2.2 Endpoints](#22-endpoints) @@ -99,6 +100,32 @@ Uploaded documents are required to contain the following metadata: | chat_endpoint | [`rag_core_api.api_endpoints.chat.Chat`](./rag-core-api/src/rag_core_api/api_endpoints/chat.py) | [`rag_core_api.impl.api_endpoints.default_chat.DefaultChat`](./rag-core-api/src/rag_core_api/impl/api_endpoints/default_chat.py) | Implementation of the chat endpoint. Default implementation just calls the *traced_chat_graph* | | ragas_llm | `langchain_core.language_models.chat_models.BaseChatModel` | `langchain_openai.ChatOpenAI` or `langchain_ollama.ChatOllama` | The LLM used for the ragas evaluation. | +### 1.4 Embedder retry behavior + +The default STACKIT embedder implementation (`StackitEmbedder`) uses the shared retry decorator with exponential backoff from the `rag-core-lib`. + +- Decorator: `rag_core_lib.impl.utils.retry_decorator.retry_with_backoff` +- Base settings (fallback): [`RetryDecoratorSettings`](./rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py) +- Per-embedder overrides: [`StackitEmbedderSettings`](./rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py) + +How it resolves settings + +- Each retry-related field in `StackitEmbedderSettings` is optional. When a field is provided (not None), it overrides the corresponding value from `RetryDecoratorSettings`. +- When a field is not provided (None), the embedder falls back to the value from `RetryDecoratorSettings`. + +Configuring via environment variables + +- Embedder-specific (prefix `STACKIT_EMBEDDER_`): + - `STACKIT_EMBEDDER_MAX_RETRIES` + - `STACKIT_EMBEDDER_RETRY_BASE_DELAY` + - `STACKIT_EMBEDDER_RETRY_MAX_DELAY` + - `STACKIT_EMBEDDER_BACKOFF_FACTOR` + - `STACKIT_EMBEDDER_ATTEMPT_CAP` + - `STACKIT_EMBEDDER_JITTER_MIN` + - `STACKIT_EMBEDDER_JITTER_MAX` +- Global fallback (prefix `RETRY_DECORATOR_`): see section [4.2](#42-retry-decorator-exponential-backoff) for all keys and defaults. +- Helm chart: set the same keys under `backend.envs.stackitEmbedder` in [infrastructure/rag/values.yaml](../infrastructure/rag/values.yaml). + ## 2. Admin API Lib The Admin API Library contains all required components for file management capabilities for RAG systems, handling all document lifecycle operations. It also includes a default `dependency_container`, that is pre-configured and should fit most use-cases. diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py index 0872ddaa..b6b6ab03 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py @@ -8,7 +8,9 @@ from langchain_core.documents import Document from langchain_core.runnables import Runnable, RunnableConfig, ensure_config from openai import APIConnectionError, APIError, APITimeoutError, RateLimitError +from openai import APIConnectionError, APIError, APITimeoutError, RateLimitError +from admin_api_lib.impl.settings.summarizer_settings import SummarizerSettings from admin_api_lib.impl.settings.summarizer_settings import SummarizerSettings from admin_api_lib.summarizer.summarizer import ( Summarizer, @@ -17,8 +19,9 @@ ) from rag_core_lib.impl.langfuse_manager.langfuse_manager import LangfuseManager from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore -from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff +from rag_core_lib.impl.utils.retry_decorator import create_retry_decorator_settings, retry_with_backoff logger = logging.getLogger(__name__) @@ -42,7 +45,7 @@ def __init__( self._chunker = chunker self._langfuse_manager = langfuse_manager self._semaphore = semaphore - self._retry_decorator_settings = self._create_retry_decorator_settings( + self._retry_decorator_settings = create_retry_decorator_settings( summarizer_settings, retry_decorator_settings ) @@ -79,6 +82,11 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] langchain_documents = self._chunker.split_documents([document]) logger.debug("Summarizing %d chunk(s)...", len(langchain_documents)) + # Fan out with concurrency, bounded by your semaphore inside _summarize_chunk + tasks = [asyncio.create_task(self._summarize_chunk(doc.page_content, config)) for doc in langchain_documents] + outputs = await asyncio.gather(*tasks) + logger.debug("Summarizing %d chunk(s)...", len(langchain_documents)) + # Fan out with concurrency, bounded by your semaphore inside _summarize_chunk tasks = [asyncio.create_task(self._summarize_chunk(doc.page_content, config)) for doc in langchain_documents] outputs = await asyncio.gather(*tasks) @@ -86,6 +94,8 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] if len(outputs) == 1: return outputs[0] + merged = " ".join(outputs) + merged = " ".join(outputs) logger.debug( "Reduced number of chars from %d to %d", @@ -94,26 +104,6 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] ) return await self._summarize_chunk(merged, config) - def _create_retry_decorator_settings( - self, summarizer_settings: SummarizerSettings, retry_decorator_settings: RetryDecoratorSettings - ): - fields = [ - "max_retries", - "retry_base_delay", - "retry_max_delay", - "backoff_factor", - "attempt_cap", - "jitter_min", - "jitter_max", - ] - settings_kwargs = { - field: getattr(summarizer_settings, field) - if getattr(summarizer_settings, field) is not None - else getattr(retry_decorator_settings, field) - for field in fields - } - return RetryDecoratorSettings(**settings_kwargs) - def _create_chain(self) -> Runnable: return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm( self.__class__.__name__ diff --git a/libs/rag-core-api/src/rag_core_api/dependency_container.py b/libs/rag-core-api/src/rag_core_api/dependency_container.py index 57cff6e3..05d9d4da 100644 --- a/libs/rag-core-api/src/rag_core_api/dependency_container.py +++ b/libs/rag-core-api/src/rag_core_api/dependency_container.py @@ -63,6 +63,7 @@ from rag_core_lib.impl.settings.langfuse_settings import LangfuseSettings from rag_core_lib.impl.settings.ollama_llm_settings import OllamaSettings from rag_core_lib.impl.settings.rag_class_types_settings import RAGClassTypeSettings +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.settings.stackit_vllm_settings import StackitVllmSettings from rag_core_lib.impl.tracers.langfuse_traced_runnable import LangfuseTracedRunnable from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore @@ -89,6 +90,7 @@ class DependencyContainer(DeclarativeContainer): stackit_embedder_settings = StackitEmbedderSettings() chat_history_settings = ChatHistorySettings() sparse_embedder_settings = SparseEmbedderSettings() + retry_decorator_settings = RetryDecoratorSettings() chat_history_config.from_dict(chat_history_settings.model_dump()) class_selector_config.from_dict(rag_class_type_settings.model_dump() | embedder_class_type_settings.model_dump()) @@ -98,7 +100,7 @@ class DependencyContainer(DeclarativeContainer): ollama=Singleton( LangchainCommunityEmbedder, embedder=Singleton(OllamaEmbeddings, **ollama_embedder_settings.model_dump()) ), - stackit=Singleton(StackitEmbedder, stackit_embedder_settings), + stackit=Singleton(StackitEmbedder, stackit_embedder_settings, retry_decorator_settings), ) sparse_embedder = Singleton(FastEmbedSparse, **sparse_embedder_settings.model_dump()) diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index 65d67a1f..ec804d2d 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -1,16 +1,23 @@ """Module that contains the StackitEmbedder class.""" from langchain_core.embeddings import Embeddings -from openai import OpenAI +from openai import OpenAI, APIConnectionError, APIError, APITimeoutError, RateLimitError from rag_core_api.embeddings.embedder import Embedder from rag_core_api.impl.settings.stackit_embedder_settings import StackitEmbedderSettings +import logging +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings +from rag_core_lib.impl.utils.retry_decorator import create_retry_decorator_settings, retry_with_backoff + +logger = logging.getLogger(__name__) class StackitEmbedder(Embedder, Embeddings): """A class that represents any Langchain provided Embedder.""" - def __init__(self, stackit_embedder_settings: StackitEmbedderSettings): + def __init__( + self, stackit_embedder_settings: StackitEmbedderSettings, retry_decorator_settings: RetryDecoratorSettings + ): """ Initialize the StackitEmbedder with the given settings. @@ -18,12 +25,17 @@ def __init__(self, stackit_embedder_settings: StackitEmbedderSettings): ---------- stackit_embedder_settings : StackitEmbedderSettings The settings for configuring the StackitEmbedder, including the API key and base URL. + retry_decorator_settings : RetryDecoratorSettings + Default retry settings used as fallback when StackitEmbedderSettings leaves fields unset. """ self._client = OpenAI( api_key=stackit_embedder_settings.api_key, base_url=stackit_embedder_settings.base_url, ) self._settings = stackit_embedder_settings + self._retry_decorator_settings = create_retry_decorator_settings( + stackit_embedder_settings, retry_decorator_settings + ) def get_embedder(self) -> "StackitEmbedder": """Return the embedder instance. @@ -48,12 +60,16 @@ def embed_documents(self, texts: list[str]) -> list[list[float]]: list[list[float]] A list where each element is a list of floats representing the embedded vector of a document. """ - responses = self._client.embeddings.create( - input=texts, - model=self._settings.model, - ) - return [data.embedding for data in responses.data] + @self._retry_with_backoff_wrapper() + def _call(texts: list[str]) -> list[list[float]]: + responses = self._client.embeddings.create( + input=texts, + model=self._settings.model, + ) + return [data.embedding for data in responses.data] + + return _call(texts) def embed_query(self, text: str) -> list[float]: """ @@ -69,4 +85,17 @@ def embed_query(self, text: str) -> list[float]: list[float] The embedded representation of the query text. """ - return self.embed_documents([text])[0] + embeddings_list = self.embed_documents([text]) + if embeddings_list: + embeddings = embeddings_list[0] + return embeddings if embeddings else [] + logger.warning("No embeddings found for query: %s", text) + return embeddings_list + + def _retry_with_backoff_wrapper(self): + return retry_with_backoff( + settings=self._retry_decorator_settings, + exceptions=(APIError, RateLimitError, APITimeoutError, APIConnectionError), + rate_limit_exceptions=(RateLimitError,), + logger=logger, + ) diff --git a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py index e451f06c..ddf09b70 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py +++ b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py @@ -1,6 +1,7 @@ """Module contains settings regarding the stackit embedder.""" -from pydantic import Field +from typing import Optional +from pydantic import Field, PositiveInt from pydantic_settings import BaseSettings @@ -17,6 +18,20 @@ class StackitEmbedderSettings(BaseSettings): (default "https://e629124b-accc-4e25-a1cc-dc57ac741e1d.model-serving.eu01.onstackit.cloud/v1"). api_key : str The API key for authentication. + max_retries: Optional[PositiveInt] + Total retries, not counting the initial attempt. + retry_base_delay: Optional[float] + Base delay in seconds for the first retry. + retry_max_delay: Optional[float] + Maximum delay cap in seconds for any single wait. + backoff_factor: Optional[float] + Exponential backoff factor (>= 1). + attempt_cap: Optional[int] + Cap for exponent growth (backoff_factor ** attempt_cap). + jitter_min: Optional[float] + Minimum jitter in seconds. + jitter_max: Optional[float] + Maximum jitter in seconds. """ class Config: @@ -28,3 +43,44 @@ class Config: model: str = Field(default="intfloat/e5-mistral-7b-instruct") base_url: str = Field(default="https://e629124b-accc-4e25-a1cc-dc57ac741e1d.model-serving.eu01.onstackit.cloud/v1") api_key: str = Field(default="") + max_retries: Optional[PositiveInt] = Field( + default=None, + title="Max Retries", + description="Total retries, not counting the initial attempt.", + ) + retry_base_delay: Optional[float] = Field( + default=None, + ge=0, + title="Retry Base Delay", + description="Base delay in seconds for the first retry.", + ) + retry_max_delay: Optional[float] = Field( + default=None, + gt=0, + title="Retry Max Delay", + description="Maximum delay cap in seconds for any single wait.", + ) + backoff_factor: Optional[float] = Field( + default=None, + ge=1.0, + title="Backoff Factor", + description="Exponential backoff factor (>= 1).", + ) + attempt_cap: Optional[int] = Field( + default=None, + ge=0, + title="Attempt Cap", + description="Cap for exponent growth (backoff_factor ** attempt_cap).", + ) + jitter_min: Optional[float] = Field( + default=None, + ge=0.0, + title="Jitter Min (s)", + description="Minimum jitter in seconds.", + ) + jitter_max: Optional[float] = Field( + default=None, + ge=0.0, + title="Jitter Max (s)", + description="Maximum jitter in seconds.", + ) diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py index 8c4e94b7..2ded2898 100644 --- a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py @@ -8,6 +8,8 @@ from functools import wraps from typing import Callable, Optional, ParamSpec, TypeVar +from pydantic_settings import BaseSettings + from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.utils.utils import ( headers_from_exception, @@ -166,3 +168,39 @@ def retry_with_backoff( logger=logger, ) return engine.decorate + +def create_retry_decorator_settings( + ai_settings: BaseSettings, retry_decorator_settings: RetryDecoratorSettings +) -> RetryDecoratorSettings: + """Create retry decorator settings based on AI and default settings. + + If the corresponding field in ai_settings is not set, the value from retry_decorator_settings will be used. + + Parameters + ---------- + ai_settings : BaseSettings + Those are the AI settings, e.g. Embeddings, Summarizers etc. + retry_decorator_settings : RetryDecoratorSettings + Those are the default retry settings. + + Returns + ------- + RetryDecoratorSettings + The combined retry settings. + """ + fields = [ + "max_retries", + "retry_base_delay", + "retry_max_delay", + "backoff_factor", + "attempt_cap", + "jitter_min", + "jitter_max", + ] + settings_kwargs = { + field: getattr(ai_settings, field) + if getattr(ai_settings, field) is not None + else getattr(retry_decorator_settings, field) + for field in fields + } + return RetryDecoratorSettings(**settings_kwargs) From ba120c161ff2a844683a1334743f9efe6c35c35d Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Thu, 9 Oct 2025 10:12:47 +0200 Subject: [PATCH 12/15] refactor: clean up import statements and streamline retry decorator settings initialization --- .../admin_api_lib/impl/summarizer/langchain_summarizer.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py index b6b6ab03..b7b0bd3c 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py @@ -8,9 +8,7 @@ from langchain_core.documents import Document from langchain_core.runnables import Runnable, RunnableConfig, ensure_config from openai import APIConnectionError, APIError, APITimeoutError, RateLimitError -from openai import APIConnectionError, APIError, APITimeoutError, RateLimitError -from admin_api_lib.impl.settings.summarizer_settings import SummarizerSettings from admin_api_lib.impl.settings.summarizer_settings import SummarizerSettings from admin_api_lib.summarizer.summarizer import ( Summarizer, @@ -19,7 +17,6 @@ ) from rag_core_lib.impl.langfuse_manager.langfuse_manager import LangfuseManager from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings -from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore from rag_core_lib.impl.utils.retry_decorator import create_retry_decorator_settings, retry_with_backoff @@ -45,9 +42,7 @@ def __init__( self._chunker = chunker self._langfuse_manager = langfuse_manager self._semaphore = semaphore - self._retry_decorator_settings = create_retry_decorator_settings( - summarizer_settings, retry_decorator_settings - ) + self._retry_decorator_settings = create_retry_decorator_settings(summarizer_settings, retry_decorator_settings) async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] = None) -> SummarizerOutput: """ From 63b9eae25a871794afb05003a7e6d2b1211204b5 Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Thu, 9 Oct 2025 10:13:54 +0200 Subject: [PATCH 13/15] refactor: improve readability of settings initialization in create_retry_decorator_settings --- .../src/rag_core_lib/impl/utils/retry_decorator.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py index 2ded2898..ad4fcf96 100644 --- a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py @@ -169,6 +169,7 @@ def retry_with_backoff( ) return engine.decorate + def create_retry_decorator_settings( ai_settings: BaseSettings, retry_decorator_settings: RetryDecoratorSettings ) -> RetryDecoratorSettings: @@ -198,9 +199,11 @@ def create_retry_decorator_settings( "jitter_max", ] settings_kwargs = { - field: getattr(ai_settings, field) - if getattr(ai_settings, field) is not None - else getattr(retry_decorator_settings, field) + field: ( + getattr(ai_settings, field) + if getattr(ai_settings, field) is not None + else getattr(retry_decorator_settings, field) + ) for field in fields } return RetryDecoratorSettings(**settings_kwargs) From c3178976c6944f815b15f0df3258d51e6e93d8ae Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Thu, 9 Oct 2025 10:28:23 +0200 Subject: [PATCH 14/15] refactor: enhance settings initialization and validation in Summarizer and StackitEmbedder settings --- .../impl/settings/summarizer_settings.py | 17 ++++++++++------- .../impl/summarizer/langchain_summarizer.py | 6 ------ .../impl/settings/stackit_embedder_settings.py | 17 ++++++++++------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py b/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py index b3138ce0..3ea7c44a 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py @@ -1,8 +1,8 @@ """Contains settings for summarizer.""" from typing import Optional -from pydantic import Field, PositiveInt -from pydantic_settings import BaseSettings +from pydantic import Field, PositiveInt, model_validator +from pydantic_settings import BaseSettings, SettingsConfigDict class SummarizerSettings(BaseSettings): @@ -31,11 +31,7 @@ class SummarizerSettings(BaseSettings): Maximum jitter in seconds. """ - class Config: - """Config class for reading Fields from env.""" - - env_prefix = "SUMMARIZER_" - case_sensitive = False + model_config = SettingsConfigDict(env_prefix="SUMMARIZER_", case_sensitive=False) maximum_input_size: int = Field(default=8000) maximum_concurrency: int = Field(default=10) @@ -80,3 +76,10 @@ class Config: title="Jitter Max (s)", description="Maximum jitter in seconds.", ) + + @model_validator(mode="after") + def _check_relations(self) -> "SummarizerSettings": + # Ensure jitter_max >= jitter_min + if self.jitter_max < self.jitter_min: + raise ValueError("jitter_max must be >= jitter_min") + return self diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py index b7b0bd3c..b9cdfdd3 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py @@ -77,11 +77,6 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] langchain_documents = self._chunker.split_documents([document]) logger.debug("Summarizing %d chunk(s)...", len(langchain_documents)) - # Fan out with concurrency, bounded by your semaphore inside _summarize_chunk - tasks = [asyncio.create_task(self._summarize_chunk(doc.page_content, config)) for doc in langchain_documents] - outputs = await asyncio.gather(*tasks) - logger.debug("Summarizing %d chunk(s)...", len(langchain_documents)) - # Fan out with concurrency, bounded by your semaphore inside _summarize_chunk tasks = [asyncio.create_task(self._summarize_chunk(doc.page_content, config)) for doc in langchain_documents] outputs = await asyncio.gather(*tasks) @@ -91,7 +86,6 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] merged = " ".join(outputs) - merged = " ".join(outputs) logger.debug( "Reduced number of chars from %d to %d", len("".join([x.page_content for x in langchain_documents])), diff --git a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py index ddf09b70..ea13745b 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py +++ b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py @@ -1,8 +1,8 @@ """Module contains settings regarding the stackit embedder.""" from typing import Optional -from pydantic import Field, PositiveInt -from pydantic_settings import BaseSettings +from pydantic import Field, PositiveInt, model_validator +from pydantic_settings import BaseSettings, SettingsConfigDict class StackitEmbedderSettings(BaseSettings): @@ -34,11 +34,7 @@ class StackitEmbedderSettings(BaseSettings): Maximum jitter in seconds. """ - class Config: - """Config class for reading Fields from env.""" - - env_prefix = "STACKIT_EMBEDDER_" - case_sensitive = False + model_config = SettingsConfigDict(env_prefix="STACKIT_EMBEDDER_", case_sensitive=False) model: str = Field(default="intfloat/e5-mistral-7b-instruct") base_url: str = Field(default="https://e629124b-accc-4e25-a1cc-dc57ac741e1d.model-serving.eu01.onstackit.cloud/v1") @@ -84,3 +80,10 @@ class Config: title="Jitter Max (s)", description="Maximum jitter in seconds.", ) + + @model_validator(mode="after") + def _check_relations(self) -> "StackitEmbedderSettings": + # Ensure jitter_max >= jitter_min + if self.jitter_max < self.jitter_min: + raise ValueError("jitter_max must be >= jitter_min") + return self From b4f291bea8ccd974b1cfd2dc58a00f4db6c9877e Mon Sep 17 00:00:00 2001 From: Andreas Klos Date: Thu, 9 Oct 2025 12:27:34 +0200 Subject: [PATCH 15/15] refactor: add validation for jitter_min and jitter_max in Summarizer and StackitEmbedder settings --- .../src/admin_api_lib/impl/settings/summarizer_settings.py | 3 ++- .../rag_core_api/impl/settings/stackit_embedder_settings.py | 3 ++- .../src/rag_core_lib/impl/utils/retry_decorator.py | 2 ++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py b/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py index 3ea7c44a..8c513f41 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py @@ -79,7 +79,8 @@ class SummarizerSettings(BaseSettings): @model_validator(mode="after") def _check_relations(self) -> "SummarizerSettings": - # Ensure jitter_max >= jitter_min + if not self.jitter_min or not self.jitter_max: + return self if self.jitter_max < self.jitter_min: raise ValueError("jitter_max must be >= jitter_min") return self diff --git a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py index ea13745b..4e99f2cd 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py +++ b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py @@ -83,7 +83,8 @@ class StackitEmbedderSettings(BaseSettings): @model_validator(mode="after") def _check_relations(self) -> "StackitEmbedderSettings": - # Ensure jitter_max >= jitter_min + if not self.jitter_min or not self.jitter_max: + return self if self.jitter_max < self.jitter_min: raise ValueError("jitter_max must be >= jitter_min") return self diff --git a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py index ad4fcf96..3e00e3bf 100644 --- a/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py +++ b/libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_decorator.py @@ -206,4 +206,6 @@ def create_retry_decorator_settings( ) for field in fields } + if settings_kwargs["jitter_max"] < settings_kwargs["jitter_min"]: + raise ValueError("jitter_max must be >= jitter_min") return RetryDecoratorSettings(**settings_kwargs)