[SPARK-56171][SQL] Enable V2 file write path for non-partitioned DataFrame API writes and delete FallBackFileSourceV2#54998
Open
LuciferYang wants to merge 3 commits intoapache:masterfrom
Open
Conversation
Contributor
Author
|
Follow-up tickets
The currently recorded plan: https://issues.apache.org/jira/browse/SPARK-56170 |
Member
|
Wow, this and many TODO IDs. Thank you for working on this area, @LuciferYang . |
30a677f to
677a482
Compare
This was referenced Mar 26, 2026
677a482 to
300f362
Compare
…Frame API writes and delete FallBackFileSourceV2 Key changes: - FileWrite: added partitionSchema, customPartitionLocations, dynamicPartitionOverwrite, isTruncate; path creation and truncate logic; dynamic partition overwrite via FileCommitProtocol - FileTable: createFileWriteBuilder with SupportsDynamicOverwrite and SupportsTruncate; capabilities now include TRUNCATE and OVERWRITE_DYNAMIC; fileIndex skips file existence checks when userSpecifiedSchema is provided (write path) - All file format writes (Parquet, ORC, CSV, JSON, Text, Avro) use createFileWriteBuilder with partition/truncate/overwrite support - DataFrameWriter.lookupV2Provider: enabled FileDataSourceV2 for non-partitioned Append and Overwrite via df.write.save(path) - DataFrameWriter.insertInto: V1 fallback for file sources (TODO: SPARK-56175) - DataFrameWriter.saveAsTable: V1 fallback for file sources (TODO: SPARK-56230, needs StagingTableCatalog) - DataSourceV2Utils.getTableProvider: V1 fallback for file sources (TODO: SPARK-56175) - Removed FallBackFileSourceV2 rule - V2SessionCatalog.createTable: V1 FileFormat data type validation
300f362 to
3162633
Compare
|
|
||
| // Built-in file formats for write testing. Text is excluded | ||
| // because it only supports a single string column. | ||
| private val fileFormats = Seq("parquet", "orc", "json", "csv") |
Member
There was a problem hiding this comment.
- Shall we revisit the comment because
avrois also excluded in this suite? - Do we have the same test coverage for
avro?
Contributor
Author
There was a problem hiding this comment.
There are quite a few loose threads here; let me handle them one by one :)
Contributor
Author
There was a problem hiding this comment.
Updated the comments and added some test cases for Avro to maintain the same scenario coverage as here.Their corresponding relationships are as follows:
| FileDataSourceV2WriteSuite test | Avro coverage |
|---|---|
| File write for multiple formats | AvroSuite test save and load |
| File write V1-vs-V2 comparison | SPARK-56171: Avro V2 write produces same results as V1 write |
| Partitioned file write | AvroSuite reading and writing partitioned data |
| Partitioned write V1-vs-V2 comparison | SPARK-56171: Avro V2 partitioned write produces same results as V1 |
| Multi-level partitioned write | SPARK-56171: Avro V2 multi-level partitioned write |
| Dynamic partition overwrite | SPARK-56171: Avro V2 dynamic partition overwrite |
| Dynamic partition overwrite V1-vs-V2 | SPARK-56171: Avro V2 dynamic partition overwrite produces same results as V1 |
| DataFrame API write (append + overwrite) | Covered by cache invalidation tests |
| DataFrame API partitioned write | Covered by multi-level partitioned test |
| DataFrame API write with compression | AvroSuite write with compression (738, 2350) |
| Catalog table INSERT INTO | SPARK-56171: Avro V2 catalog table INSERT INTO |
| Catalog table partitioned INSERT INTO | SPARK-56171: Avro V2 catalog table partitioned INSERT INTO |
| V2 cache invalidation on overwrite | SPARK-56171: Avro V2 cache invalidation on overwrite |
| V2 cache invalidation on append | SPARK-56171: Avro V2 cache invalidation on append |
| Cache invalidation on catalog table overwrite | SPARK-56171: Avro V2 cache invalidation on catalog table overwrite |
| CTAS | SPARK-56171: Avro V2 CTAS |
| Partitioned write to empty directory | SPARK-56171: Avro V2 partitioned write to empty directory |
| Partitioned overwrite to existing directory | SPARK-56171: Avro V2 partitioned overwrite to existing directory |
# Conflicts: # connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Enable the V2 file write path for non-partitioned
df.write.mode("append"/"overwrite").save(path)across all built-in file formats (Parquet, ORC, JSON, CSV, Text, Avro), and delete the now-redundantFallBackFileSourceV2analysis rule.Key changes
V2 write infrastructure (
FileTable,FileWrite)FileTable.createFileWriteBuilder: shared builder withSupportsTruncateandSupportsDynamicOverwritecapabilitiesFileWrite: partition schema, truncation (overwrite), dynamic partition overwrite, schema validation (nested column name duplication, data type, collation in map keys)WriteandTableclasses usecreateFileWriteBuilderDelete
FallBackFileSourceV2BaseSessionStateBuilder/HiveSessionStateBuilderUSE_V1_SOURCE_LIST(default: all formats) already prevents V2 file tables from being created, and the DataFrame API usesAppendData/OverwriteByExpression(notInsertIntoStatement)Cache invalidation (
DataSourceV2Strategy)recacheByPathwithfileIndex.refresh()forFileTablewritesError handling (
FileFormatDataWriter)writeAllto wrap errors withTASK_WRITE_FAILEDV2 write gating (
DataFrameWriter,DataSourceV2Utils)FileDataSourceV2only forAppend/OverwritewithoutpartitionBy; fall back to V1 forErrorIfExists/Ignore(TODO: SPARK-56174) and partitioned writessaveAsTable/insertInto: V1 fallback forFileDataSourceV2DataSourceV2Utils.getTableProvider: returnNoneforFileDataSourceV2to prevent V2 catalog table loading until remaining gaps are addressedData type validation (
V2SessionCatalog)FileFormat.supportDataTypevalidation increateTablefallback, ensuringCREATE TABLEwith unsupported types is rejected consistentlyWhy are the changes needed?
The V2 Data Source API provides a cleaner write path than V1's
InsertIntoHadoopFsRelationCommand. Enabling V2 writes for built-in file formats is a step toward fully migrating file sources to V2 (SPARK-56170).Does this PR introduce any user-facing change?
No. With default configuration, all file writes use V1. The V2 path activates only when a user explicitly clears
USE_V1_SOURCE_LISTand usesdf.write.mode("append"/"overwrite").save(path)withoutpartitionBy.How was this patch tested?
FileDataSourceV2WriteSuite(23 tests): V2 write correctness, V1/V2 result comparison, partitioned writes, dynamic partition overwrite, cache invalidation, DataFrame API modes, catalog table INSERT INTO, CTASAvroV2Suite(13 tests): equivalent V2 write coverage for Avro (V1/V2 comparison, multi-level partitioned write, dynamic overwrite, cache invalidation, catalog INSERT INTO, CTAS, partitioned write to empty/existing directory)Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code 4.6