From eb5f96978da13689f59595289ffdb817bcb04147 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Fri, 8 May 2026 13:34:07 -0700 Subject: [PATCH] Wire Bugsnag into the orchestrator --- cloud_pipelines_backend/orchestrator_sql.py | 8 ++++++-- orchestrator_main.py | 2 ++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index dfbc99f..b7acf71 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -21,6 +21,7 @@ from . import component_structures as structures from .launchers import common_annotations from .launchers import interfaces as launcher_interfaces +from .instrumentation import bugsnag_instrumentation from .instrumentation import contextual_logging from .instrumentation import metrics as app_metrics @@ -66,8 +67,9 @@ def run_loop(self): try: self.process_each_queue_once() time.sleep(self._sleep_seconds_between_queue_sweeps) - except Exception: + except Exception as exc: _logger.exception("Error while calling `process_each_queue_once`") + bugsnag_instrumentation.notify(exception=exc) def process_each_queue_once(self): queue_handlers = [ @@ -78,8 +80,9 @@ def process_each_queue_once(self): try: with self._session_factory() as session: queue_handler(session=session) - except Exception: + except Exception as exc: _logger.exception(f"Error while executing {queue_handler=}") + bugsnag_instrumentation.notify(exception=exc) def internal_process_queued_executions_queue(self, session: orm.Session): query = ( @@ -1064,6 +1067,7 @@ def _retry( def record_system_error_exception(execution: bts.ExecutionNode, exception: Exception): app_metrics.execution_system_errors.add(1) + bugsnag_instrumentation.notify(exception=exception, execution_id=str(execution.id)) if execution.extra_data is None: execution.extra_data = {} diff --git a/orchestrator_main.py b/orchestrator_main.py index 99b7e8f..58abd28 100644 --- a/orchestrator_main.py +++ b/orchestrator_main.py @@ -6,6 +6,7 @@ from sqlalchemy import orm from cloud_pipelines_backend import orchestrator_sql +from cloud_pipelines_backend.instrumentation import bugsnag_instrumentation from cloud_pipelines_backend.launchers import kubernetes_launchers from cloud_pipelines.orchestration.storage_providers import local_storage @@ -32,6 +33,7 @@ def main(): logger.addHandler(stderr_handler) logger.info("Starting the orchestrator") + bugsnag_instrumentation.setup(service_name="tangle-orchestrator") DEFAULT_DATABASE_URI = "sqlite:///db.sqlite" database_uri = os.environ.get("DATABASE_URI", DEFAULT_DATABASE_URI)