diff --git a/architecture/dataset-builders.md b/architecture/dataset-builders.md index 0cca1c7d8..cf06eea17 100644 --- a/architecture/dataset-builders.md +++ b/architecture/dataset-builders.md @@ -71,11 +71,7 @@ Both execution modes integrate skip at the same points: - **Sequential**: `_run_full_column_generator` and the fan-out methods (`_fan_out_with_threads`, `_fan_out_with_async`) call `_should_skip_cell` per record. Skipped rows are excluded from the generator input, then merged back with skip metadata preserved. A fast `_column_can_skip` check short-circuits the per-record evaluation when no skip config or propagation applies. - **Async**: `_run_cell` and `_run_batch` in `AsyncTaskScheduler` call `_should_skip_record` / `_apply_skip_to_record` with the same logic. Skipped cells report as skipped (not success) in progress tracking. -DAG edges are added for `skip.when` column references (both in `dag.py` and `ExecutionGraph.create`) so skip-gate columns are generated before the gated column. - -### DAG (Config-Level) - -`dataset_builders/utils/dag.py` provides `topologically_sort_column_configs` — builds a NetworkX graph from `required_columns`, side-effect columns, and `skip.when` references, returns a topological ordering. Used by both execution modes for initial column ordering. +DAG edges are added for `skip.when` column references in both `topologically_sort_column_configs` (compile-time sort) and `ExecutionGraph.create` (async runtime) so skip-gate columns are generated before the gated column. ### DatasetBatchManager @@ -118,7 +114,7 @@ DatasetBuilder.build() - **Dual execution engines behind one API.** The sequential engine is simpler and easier to debug; the async engine adds row-group parallelism for throughput. Users switch via an environment variable without changing their code. - **DAG-driven ordering** ensures columns with dependencies (e.g., a judge column that depends on a text column) are generated in the correct order, regardless of the order they appear in the config. - **Salvage rounds in async mode** retry failed tasks after all other tasks in a round complete, improving resilience against transient LLM failures without blocking the entire generation. -- **Separate config-level and runtime DAGs.** The config-level DAG (`dag.py`) determines column ordering; the runtime `ExecutionGraph` adds strategy-aware dependency tracking for the async scheduler. +- **Unified DAG construction.** `topologically_sort_column_configs` (in `execution_graph.py`) determines column ordering using Kahn's algorithm; the runtime `ExecutionGraph` adds strategy-aware dependency tracking for the async scheduler. ## Cross-References diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/config_compiler.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/config_compiler.py index 8112d87e8..208fa6d80 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/config_compiler.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/config_compiler.py @@ -11,8 +11,8 @@ SamplerMultiColumnConfig, SeedDatasetMultiColumnConfig, ) -from data_designer.engine.dataset_builders.utils.dag import topologically_sort_column_configs from data_designer.engine.dataset_builders.utils.errors import ConfigCompilationError +from data_designer.engine.dataset_builders.utils.execution_graph import topologically_sort_column_configs def compile_dataset_builder_column_configs(config: DataDesignerConfig) -> list[DatasetBuilderColumnConfigT]: diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py deleted file mode 100644 index 4b3e03670..000000000 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py +++ /dev/null @@ -1,89 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -from __future__ import annotations - -import logging -from itertools import chain - -import data_designer.lazy_heavy_imports as lazy -from data_designer.config.column_types import ColumnConfigT -from data_designer.engine.column_generators.utils.generator_classification import column_type_used_in_execution_dag -from data_designer.engine.dataset_builders.utils.errors import ConfigCompilationError, DAGCircularDependencyError -from data_designer.logging import LOG_INDENT - -logger = logging.getLogger(__name__) - - -def topologically_sort_column_configs(column_configs: list[ColumnConfigT]) -> list[ColumnConfigT]: - dag = lazy.nx.DiGraph() - - non_dag_column_config_list = [ - col for col in column_configs if not column_type_used_in_execution_dag(col.column_type) - ] - dag_column_config_dict = { - col.name: col for col in column_configs if column_type_used_in_execution_dag(col.column_type) - } - - if len(dag_column_config_dict) == 0: - return non_dag_column_config_list - - side_effect_dict = {n: list(c.side_effect_columns) for n, c in dag_column_config_dict.items()} - all_side_effects = set(chain.from_iterable(side_effect_dict.values())) - - side_effect_to_producer: dict[str, str] = {} - for producer, cols in side_effect_dict.items(): - for col in cols: - existing = side_effect_to_producer.get(col) - if existing is not None and existing != producer: - raise ConfigCompilationError( - f"Side-effect column {col!r} is already produced by {existing!r}; " - f"cannot register a second producer {producer!r}. " - f"Use distinct side-effect column names for each pipeline stage." - ) - side_effect_to_producer[col] = producer - - logger.info("⛓️ Sorting column configs into a Directed Acyclic Graph") - for name, col in dag_column_config_dict.items(): - dag.add_node(name) - _add_dependency_edges( - dag, name, list(col.required_columns), dag_column_config_dict, side_effect_dict, all_side_effects, "" - ) - if col.skip is not None: - _add_dependency_edges( - dag, name, col.skip.columns, dag_column_config_dict, side_effect_dict, all_side_effects, "skip.when" - ) - - if not lazy.nx.is_directed_acyclic_graph(dag): - raise DAGCircularDependencyError( - "🛑 The Data Designer column configurations contain cyclic dependencies. Please " - "inspect the column configurations and ensure they can be sorted without " - "circular references." - ) - - sorted_columns = non_dag_column_config_list - sorted_columns.extend([dag_column_config_dict[n] for n in list(lazy.nx.topological_sort(dag))]) - - return sorted_columns - - -def _add_dependency_edges( - dag: lazy.nx.DiGraph, - name: str, - dep_names: list[str], - dag_column_config_dict: dict[str, ColumnConfigT], - side_effect_dict: dict[str, list[str]], - all_side_effects: set[str], - label: str, -) -> None: - """Add DAG edges from *dep_names* to *name*, resolving through side-effect parents.""" - for dep in dep_names: - if dep in dag_column_config_dict: - logger.debug(f"{LOG_INDENT}🔗 `{name}` {label} depends on `{dep}`") - dag.add_edge(dep, name) - elif dep in all_side_effects: - for parent, cols in side_effect_dict.items(): - if dep in cols: - logger.debug(f"{LOG_INDENT}🔗 `{name}` {label} depends on `{parent}` via `{dep}`") - dag.add_edge(parent, name) - break diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py index 5cd41dd38..b090cf63d 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py @@ -9,12 +9,15 @@ from typing import TYPE_CHECKING from data_designer.config.column_configs import GenerationStrategy +from data_designer.config.column_types import ColumnConfigT +from data_designer.engine.column_generators.utils.generator_classification import column_type_used_in_execution_dag from data_designer.engine.dataset_builders.multi_column_configs import ( DatasetBuilderColumnConfigT, MultiColumnConfig, ) from data_designer.engine.dataset_builders.utils.errors import ConfigCompilationError, DAGCircularDependencyError from data_designer.engine.dataset_builders.utils.task_model import SliceRef +from data_designer.logging import LOG_INDENT logger = logging.getLogger(__name__) @@ -230,27 +233,12 @@ def get_topological_order(self) -> list[str]: if self._topological_order_cache is not None: return list(self._topological_order_cache) - in_degree: dict[str, int] = {col: 0 for col in self._columns} - for col, deps in self._upstream.items(): - if col in in_degree: - in_degree[col] = len(deps) - - queue = deque(col for col, deg in in_degree.items() if deg == 0) - order: list[str] = [] - while queue: - col = queue.popleft() - order.append(col) - for child in self._downstream.get(col, set()): - if child in in_degree: - in_degree[child] -= 1 - if in_degree[child] == 0: - queue.append(child) - - if len(order) != len(self._columns): - raise DAGCircularDependencyError( - f"The execution graph contains cyclic dependencies. Resolved {len(order)}/{len(self._columns)} columns." - ) - + order = _kahns_topological_sort( + self._columns, + self._upstream, + self._downstream, + "The execution graph contains cyclic dependencies.", + ) self._topological_order_cache = order return list(order) @@ -330,3 +318,106 @@ def to_mermaid(self) -> str: for dep in sorted(self._upstream.get(col, set())): lines.append(f" {dep} --> {col}") return "\n".join(lines) + + +def topologically_sort_column_configs(column_configs: list[ColumnConfigT]) -> list[ColumnConfigT]: + """Return column configs in dependency order using Kahn's algorithm. + + Non-DAG columns (samplers, seeds) are placed first, followed by DAG columns + sorted by ``required_columns`` and ``skip.when`` edges. Side-effect columns + are resolved to their producing column. + + Raises: + ConfigCompilationError: If two columns declare the same side-effect name. + DAGCircularDependencyError: If the dependency graph contains a cycle. + """ + non_dag_cols = [col for col in column_configs if not column_type_used_in_execution_dag(col.column_type)] + dag_col_dict = {col.name: col for col in column_configs if column_type_used_in_execution_dag(col.column_type)} + + if not dag_col_dict: + return non_dag_cols + + # side_effect_col_name -> producing column name + side_effect_map: dict[str, str] = {} + for name, col in dag_col_dict.items(): + for se_col in col.side_effect_columns: + existing = side_effect_map.get(se_col) + if existing is not None and existing != name: + raise ConfigCompilationError( + f"Side-effect column {se_col!r} is already produced by {existing!r}; " + f"cannot register a second producer {name!r}. " + f"Use distinct side-effect column names for each pipeline stage." + ) + side_effect_map[se_col] = name + + upstream: dict[str, set[str]] = {name: set() for name in dag_col_dict} + downstream: dict[str, set[str]] = {name: set() for name in dag_col_dict} + + logger.info("⛓️ Sorting column configs into a Directed Acyclic Graph") + for name, col in dag_col_dict.items(): + for req in col.required_columns: + _add_dag_edge(name, req, "required", dag_col_dict, side_effect_map, upstream, downstream) + if col.skip is not None: + for skip_col in col.skip.columns: + _add_dag_edge(name, skip_col, "skip.when", dag_col_dict, side_effect_map, upstream, downstream) + + order = _kahns_topological_sort( + list(dag_col_dict), + upstream, + downstream, + "🛑 The Data Designer column configurations contain cyclic dependencies. Please " + "inspect the column configurations and ensure they can be sorted without " + "circular references.", + ) + + return non_dag_cols + [dag_col_dict[n] for n in order] + + +def _add_dag_edge( + name: str, + dep: str, + label: str, + dag_col_dict: dict[str, ColumnConfigT], + side_effect_map: dict[str, str], + upstream: dict[str, set[str]], + downstream: dict[str, set[str]], +) -> None: + """Add a dependency edge from *dep*'s producer to *name* if the dep is a known DAG column. + + Self-edges are skipped, consistent with ``ExecutionGraph.create``. + The *label* parameter (``"required"`` or ``"skip.when"``) is included in + the debug log so the source of each edge is visible during tracing. + """ + resolved = dep if dep in dag_col_dict else side_effect_map.get(dep) + if resolved is None or resolved == name: + return + logger.debug(f"{LOG_INDENT}🔗 `{name}` depends on `{resolved}` [{label}]") + upstream[name].add(resolved) + downstream[resolved].add(name) + + +def _kahns_topological_sort( + nodes: list[str], + upstream: dict[str, set[str]], + downstream: dict[str, set[str]], + error_message: str, +) -> list[str]: + """Return a topological ordering of *nodes* using Kahn's algorithm. + + Raises: + DAGCircularDependencyError: If the graph contains a cycle. + """ + in_degree: dict[str, int] = {col: len(upstream.get(col, set())) for col in nodes} + queue: deque[str] = deque(col for col, deg in in_degree.items() if deg == 0) + order: list[str] = [] + while queue: + col = queue.popleft() + order.append(col) + for child in downstream.get(col, set()): + if child in in_degree: + in_degree[child] -= 1 + if in_degree[child] == 0: + queue.append(child) + if len(order) != len(nodes): + raise DAGCircularDependencyError(error_message) + return order diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dag.py b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_topological_sort.py similarity index 66% rename from packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dag.py rename to packages/data-designer-engine/tests/engine/dataset_builders/utils/test_topological_sort.py index bbb7aa9c8..6ad9d6504 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dag.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_topological_sort.py @@ -1,10 +1,13 @@ # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + from typing import Any import pytest +from data_designer.config.base import SkipConfig from data_designer.config.column_configs import ( CustomColumnConfig, ExpressionColumnConfig, @@ -21,13 +24,13 @@ from data_designer.config.utils.code_lang import CodeLang from data_designer.config.validator_params import CodeValidatorParams from data_designer.engine.dataset_builders.multi_column_configs import SamplerMultiColumnConfig -from data_designer.engine.dataset_builders.utils.dag import topologically_sort_column_configs from data_designer.engine.dataset_builders.utils.errors import ConfigCompilationError, DAGCircularDependencyError +from data_designer.engine.dataset_builders.utils.execution_graph import topologically_sort_column_configs MODEL_ALIAS = "stub-model-alias" -def test_dag_construction(): +def test_dag_construction() -> None: column_configs = [] column_configs.append( SamplerMultiColumnConfig( @@ -82,17 +85,17 @@ def test_dag_construction(): assert sorted_column_configs[0].column_type == DataDesignerColumnType.SAMPLER - assert [c.name for c in sorted_column_configs[1:]] == [ - "test_code", - "test_validation", - "depends_on_validation", - "test_judge", - "test_code_and_depends_on_validation_reasoning_traces", - "uses_all_the_stuff", - ] + names = [c.name for c in sorted_column_configs[1:]] + assert names[0] == "test_code" + assert names[1] == "test_validation" + assert names[2] == "depends_on_validation" + # test_judge and test_code_and_depends_on_validation_reasoning_traces have no mutual + # dependency, so their relative order is not guaranteed by topological sort. + assert set(names[3:5]) == {"test_judge", "test_code_and_depends_on_validation_reasoning_traces"} + assert names[5] == "uses_all_the_stuff" -def test_circular_dependencies(): +def test_circular_dependencies() -> None: column_configs = [] column_configs.append( SamplerMultiColumnConfig( @@ -135,3 +138,37 @@ def gen_b(row: dict[str, Any]) -> dict[str, Any]: ] with pytest.raises(ConfigCompilationError, match="already produced by"): topologically_sort_column_configs(column_configs) + + +def test_side_effect_column_ordering() -> None: + """A column that depends on a side-effect column is sorted after its producer.""" + + @custom_column_generator(required_columns=["seed"], side_effect_columns=["seed_trace"]) + def gen_with_trace(row: dict[str, Any]) -> dict[str, Any]: + return row + + column_configs = [ + LLMTextColumnConfig(name="seed", prompt="generate seed", model_alias=MODEL_ALIAS), + ExpressionColumnConfig(name="consumer", expr="{{ seed_trace }}"), + CustomColumnConfig(name="producer", generator_function=gen_with_trace), + ] + sorted_configs = topologically_sort_column_configs(column_configs) + names = [c.name for c in sorted_configs] + assert names.index("producer") < names.index("consumer") + + +def test_skip_when_column_ordering() -> None: + """A column with skip.when referencing another DAG column is sorted after that column.""" + column_configs = [ + LLMTextColumnConfig(name="seed", prompt="generate seed", model_alias=MODEL_ALIAS), + LLMTextColumnConfig( + name="gated", + prompt="generate gated", + model_alias=MODEL_ALIAS, + skip=SkipConfig(when="{{ seed == 'bad' }}"), + ), + ] + # gated has no required_columns referencing seed, only a skip.when dependency + sorted_configs = topologically_sort_column_configs(column_configs) + names = [c.name for c in sorted_configs] + assert names.index("seed") < names.index("gated")