Support delete command for iceberg's copy on write mode#13639
Support delete command for iceberg's copy on write mode#13639liurenjie1024 merged 23 commits intoNVIDIA:mainfrom
Conversation
Signed-off-by: liurenjie1024 <liurenjie2008@gmail.com>
There was a problem hiding this comment.
Pull Request Overview
This PR adds support for Iceberg's copy-on-write DELETE operations by implementing GPU acceleration for the ReplaceDataExec physical plan node. The changes enable GPU execution of DELETE commands when Iceberg tables are configured with copy-on-write mode.
Key Changes:
- Added
GpuReplaceDataExecand associated meta classes to handle copy-on-write DELETE operations - Implemented
GpuCopyOnWriteOperationbatch write wrapper andGpuSparkCopyOnWriteScanfor reading data during DELETE - Extended Iceberg provider infrastructure to support the new execution path
- Added comprehensive integration tests covering various DELETE scenarios and fallback conditions
Reviewed Changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tools/generated_files/356/supportedExecs.csv | Added ReplaceDataExec to supported operations list |
| tools/generated_files/356/operatorsScore.csv | Added performance score for ReplaceDataExec operator |
| sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala | Implemented GpuReplaceDataExec physical plan node |
| sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/Spark350PlusNonDBShims.scala | Registered ReplaceDataExec in supported operations map |
| sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExternalSource.scala | Added tagging and conversion methods for ReplaceDataExec |
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/v2WriteCommandMetas.scala | Created ReplaceDataExecMeta for metadata handling |
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/iceberg/IcebergProvider.scala | Updated interface to support ReplaceDataExec operations and changed scan class reference |
| integration_tests/src/main/python/iceberg/iceberg_delete_test.py | Added comprehensive DELETE operation tests including fallback scenarios |
| iceberg/src/main/scala/org/apache/iceberg/spark/source/write.scala | Implemented GpuCopyOnWriteOperation for batch write delegation |
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala | Added CopyOnWriteOperation handling in batch write conversion |
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkScan.scala | Refactored scan conversion logic and moved utility methods to companion object |
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala | Created new scan implementation for copy-on-write operations |
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkBatchQueryScan.scala | Moved shared utility methods to parent class companion object |
| iceberg/src/main/scala/com/nvidia/spark/rapids/iceberg/IcebergProviderImpl.scala | Implemented ReplaceDataExec tagging and GPU conversion logic |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/iceberg/IcebergProvider.scala
Show resolved
Hide resolved
iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkScan.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR adds GPU acceleration for Iceberg DELETE statements operating in copy-on-write mode. The implementation extends the existing RAPIDS Iceberg integration by introducing ReplaceDataExec support across the plugin architecture: registering the execution node in the Spark 3.5.0+ shim (Spark350PlusNonDBShims.scala), creating a corresponding meta class (ReplaceDataExecMeta) that delegates tagging and conversion to the Iceberg provider, and implementing GPU-accelerated scan (GpuSparkCopyOnWriteScan) and write (GpuCopyOnWriteOperation) components. The architecture mirrors existing write operations (AppendData, OverwriteByExpression) by wrapping CPU implementations and delegating complex commit/transaction logic while GPU-accelerating the data-intensive file rewriting. A comprehensive test suite validates correctness across partitioned/unpartitioned tables and documents fallback scenarios.
PR Description Notes:
- Line 121 in
GpuSparkScan.scalacontains a typo: "current" should be "currently" - Missing trailing newlines in
GpuSparkCopyOnWriteScan.scalaandwrite.scala
Potential Issues:
-
Type Safety in
GpuSparkScan.isMetadataScan(line 79): The method performs an unsafe castscan.asInstanceOf[SparkScan]without validation. If a non-Iceberg Scan is passed (e.g., from another data source), this will throw aClassCastExceptionat runtime. Consider adding type checking:def isMetadataScan(scan: Scan): Boolean = { scan match { case sparkScan: SparkScan => sparkScan.table().isInstanceOf[BaseMetadataTable] case _ => false } }
-
Potential Configuration Inconsistency: The
equals/hashCodeimplementation inGpuSparkCopyOnWriteScanexcludesrapidsConf, but includes it inGpuSparkBatchQueryScan. This inconsistency may cause subtle bugs if configuration differences affect scan behavior. Verify this is intentional or align both implementations. -
Test Data Reproducibility:
iceberg_delete_test.pyuses session-specific table operations (with_cpu_session) to compare results. If CPU and GPU sessions use different catalog configurations or isolation levels, results may diverge due to timing or visibility issues rather than actual correctness problems. Consider using a single session with different table names or explicit synchronization. -
Fallback Test Coverage Gap: Line 153-161 in
iceberg_delete_test.pymarksidentityandbucket(8, _c6)transforms as expected fallbacks, but line 123 showsbucket(16, _c2)executing successfully on GPU. The test should document why certain bucket transforms fail while others succeed (likely related to column types or bucket counts).
Confidence: 4/5
The implementation follows established patterns in the codebase and the changes are well-structured. The confidence score is 4 (not 5) due to the runtime type-safety concern in isMetadataScan and potential edge cases in the test configuration that could mask correctness issues.
12 files reviewed, 3 comments
iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkScan.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This review covers only the changes made since the last review, not the entire PR. The most recent updates refine the error message in GpuSparkScan.scala to accurately reflect support for both batch query and copy-on-write scans, addressing prior feedback. The error message now correctly states "batch query scan and copy-on-write scan" instead of just "batch query scan," aligning with the implementation that handles both SparkBatchQueryScan and SparkCopyOnWriteScan. This is a documentation-only change with no runtime behavior modification.
PR Description Notes:
- The PR title and description are grammatically inconsistent: "delete command for iceberg's copy on write mode" should be "delete command for Iceberg's copy-on-write mode" (capitalization and hyphenation).
Potential Issues
1. Risk: SparkPartitioningAwareScan base class expansion (from previous review remains unresolved)
The earlier review flagged that cpuScanClassName was changed from SparkBatchQueryScan to SparkPartitioningAwareScan (line 44 in IcebergProviderImpl.scala). This is a broader base class that may match unsupported scan subtypes beyond SparkBatchQueryScan and SparkCopyOnWriteScan. The code in GpuSparkScan.scala (lines 82-92) only has explicit cases for these two scan types, but if SparkPartitioningAwareScan includes other subtypes (e.g., incremental scans, metadata-only scans), the current implementation will throw the "unsupported" error without properly validating at the provider level. This could lead to incorrect fallback behavior or runtime exceptions.
Recommendation: In IcebergProviderImpl.scala, after loading SparkPartitioningAwareScan, add an explicit whitelist check that the scan instance is either SparkBatchQueryScan or SparkCopyOnWriteScan before returningtruefrom isSupportedScan(). This ensures that broader base class matching doesn't inadvertently allow unsupported scan types through.
2. Consistency: Error message update is incomplete
While the error message now mentions "copy-on-write scan," it doesn't align with the PR's full scope. The PR title and description reference "delete command for iceberg's copy on write mode," but the error message in GpuSparkScan.scala refers to "scan" types rather than command types. This creates potential confusion—DELETE operations in Iceberg COW mode use ReplaceDataExec (a write command) which internally triggers scans. The error message should clarify whether it's referring to read scan operations or write command operations to avoid misleading users who encounter the error during DELETE execution.
Recommendation: Consider prefixing the error message with context: "Currently GPU acceleration for Iceberg read operations only supports batch query scan and copy-on-write scan, but got..." to disambiguate scan-level vs. command-level support.
Confidence Score
4 out of 5
The error message update is straightforward and correct for its immediate scope, but the unresolved SparkPartitioningAwareScan base class issue from the previous review remains a moderate risk. The change is safe to merge if the team has verified that no other SparkPartitioningAwareScan subtypes exist in the supported Iceberg/Spark versions, but documentation of this assumption is missing.
9 files reviewed, 1 comment
| case replaceData: ReplaceDataExec => | ||
| tagForGpu(replaceData, meta.asInstanceOf[ReplaceDataExecMeta]) |
There was a problem hiding this comment.
logic: asInstanceOf cast assumes meta is ReplaceDataExecMeta, but only compile-time generic bounds exist. If caller passes mismatched meta type, this will throw ClassCastException at runtime. Consider adding a runtime type check or match guard. Is there a higher-level guard ensuring the meta type always matches the cpuExec type before calling these dispatchers?
sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/v2WriteCommandMetasShim.scala
Show resolved
Hide resolved
sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/ExternalSourceShim.scala
Show resolved
Hide resolved
|
LGTM, only some comments for Shim code. |
|
build |
|
See #13652 |
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This review covers only the changes made since the last review, not the entire PR.
The developer has addressed all previous feedback by: (1) reverting the cpuScanClassName change back to the original cpuBatchQueryScanClassName to maintain precise scan-type matching, (2) updating the error message in GpuSparkScan.tryConvert to reflect support for both batch query and copy-on-write scans, and (3) removing the unsafe cast warning comment (implicitly addressed by keeping the code structure unchanged). The changes are minimal and surgical—only two lines modified across two files—demonstrating a focused response to reviewer concerns without introducing new risk.
Important Files Changed
| Filename | Score | Overview |
|---|---|---|
| sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExternalSource.scala | 5/5 | Reverted CPU scan class name from cpuScanClassName to cpuBatchQueryScanClassName to maintain precise type matching |
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkScan.scala | 5/5 | Updated error message to reflect expanded scan support for both batch query and copy-on-write operations |
Confidence score: 5/5
- This PR is safe to merge with minimal risk
- Score reflects that these are simple documentation and naming consistency fixes directly addressing previous review feedback, with no logic changes or new functionality introduced
- No files require special attention; both changes are straightforward corrections
15 files reviewed, 7 comments
| } | ||
|
|
||
| override def filter(filters: Array[Filter]): Unit = cpuScan.filter(filters) | ||
| } No newline at end of file |
There was a problem hiding this comment.
style: Missing newline at end of file
| override def equals(obj: Any): Boolean = { | ||
| obj match { | ||
| case that: GpuSparkCopyOnWriteScan => | ||
| this.cpuScan == that.cpuScan && | ||
| this.queryUsesInputFile == that.queryUsesInputFile | ||
| case _ => false | ||
| } | ||
| } |
There was a problem hiding this comment.
logic: equals should also compare rapidsConf to maintain consistency with the class constructor parameters. Two instances with identical cpuScan and queryUsesInputFile but different rapidsConf would incorrectly be considered equal. Are there scenarios where two scans should be considered equal even if they have different RapidsConf instances?
| override def hashCode(): Int = { | ||
| Objects.hash(cpuScan, Boolean.box(queryUsesInputFile)) | ||
| } |
There was a problem hiding this comment.
logic: hashCode should include rapidsConf to match the equals implementation and satisfy the equals-hashCode contract
| gpuScan match { | ||
| case Success(s) => | ||
| if (s.hasNestedType) { | ||
| meta.willNotWorkOnGpu("Iceberg current doesn't support nested types") |
There was a problem hiding this comment.
syntax: typo: "current" should be "currently"
| def assert_gpu_fallback_write_sql(write_func, | ||
| read_func, | ||
| base_table_name, | ||
| cpu_fallback_class_name_list, | ||
| conf={}): |
There was a problem hiding this comment.
style: parameter alignment - read_func and base_table_name should align with opening paren on line 449
|
|
||
| print('### CPU RUN ###') | ||
| cpu_start = time.time() | ||
| cpu_table_name = f'{base_table_name}_cpu' |
There was a problem hiding this comment.
style: inconsistent quoting style - use double quotes for consistency with line 465, or use raw f-string prefix
| object ExternalSource extends ExternalSourceBase { | ||
| } No newline at end of file |
There was a problem hiding this comment.
style: Add newline before closing brace to match repository formatting conventions
|
build |
Fixes #13523 .
Description
Support delete command for iceberg's copy on write mode
Checklists
(Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)