[SPARK-52482][SQL][CORE] ZStandard support for file data source reader#51182
[SPARK-52482][SQL][CORE] ZStandard support for file data source reader#51182sandip-db wants to merge 13 commits intoapache:masterfrom
Conversation
|
Hadoop has built-in Even if you don't want to touch the Hadoop code, this PR approach looks too overkill, Hadoop provides |
There was a problem hiding this comment.
This logic is subtle, why is this fallback required?
There was a problem hiding this comment.
createInputStream may fail with an Exception for ZSTD if hadoop is not already compiled with ZSTD support. In that case, we try to use Spark's Zstandard codec if enabled. Added a comment to clarify this.
@pan3793 Thanks for your comment. The PR adds less than 50 lines of code to re-use Spark's Implementing |
@pan3793 does Hadoop's |
There was a problem hiding this comment.
shall we put the test in FileBasedDataSourceSuite, so that it's easier to share code and test different formats?
There was a problem hiding this comment.
I thought about this. However, each format has some special cases that requires generating the files slightly differently:
- TextFileFormat has wholeText option that reads the whole file
- CSV has header option
- Json has single and multiline options
@cloud-fan Its possible to pass different codecs via |
2f90c2b to
4b7cf40
Compare
@cloud-fan as @sandip-db said, Hadoop's how do you define the behavior of "specify the compression at the session level"? always respect session conf and ignore filename suffix? or fallback to use codec suggested by session conf when something goes wrong? also, please be careful with that Hadoop codec may have different behavior with Spark/Unix tool codec, for example, HADOOP-12990(lz4) |
@sandip-db to handle "non-standard extensions", you just need to register another Hadoop codec, for example for "no extensions" compressed text files, I'm not sure if this is a valid use case(see my last comment) |
@pan3793 While uncommon, we come across users who have compressed files without any extension. Most recently, an user had to rename a billion gzipped compressed files in S3 because it didn't have any extension. |
Thanks for bringing this to my attention. We are not adding support for ZSTD compression using Spark codec yet. This PR just adds decompression support. @cloud-fan @pan3793 Do you have any further concern with this PR. Can this be merged? |
|
@sandip-db TBH, the current approach (control flow is based on try-catch exception) seems too hacky, and I'd like to see more detailed designs of your next steps. (I don't think the below question got answered.)
You still need to ensure that Spark's zstd codec is compatible with Hadoop's implementation. I have experienced using the AirCompressor LZO codec to decompress the files written via hadoop-lzo may randomly get corrupt content with no errors. |
|
Instead of relying on the filename suffix or Spark session conf to choose the codec, I wonder if the Magic Number was considered? For example, the |
|
@pan3793 Thanks for your input.
User should be able to specify the compression type in their query using data source reader option. For example: If this option is specified, Spark will always use the compression type specified by the user. There are some other alternatives available like first use the file path extension or use the magic number (used by the file utility) to determine the codec type.
I agree and I wonder why Hadoop didn't do this in the first place. There will be some performance penalty for examining the magic number and reopening the file input stream with appropriate codec. Having said that, we can take the discussion of the compression option and the use of magic number to my next PR. The current PR is about adding ZSTD decompression support. I would appreciate if we can close this first. The try-catch logic has been used to pick codec in the following order:
For compatibility check, I have added some more tests that reads files compressed with ZSTD in ubuntu (version: 1.4.4+dfsg-3ubuntu0.1). |
|
OK, it's fair enough to fork Hadoop's
Having a wrapper Codec to look ahead Magic Number and transfer the InputStream to the concrete Codec should eliminate the re-open cost, anyway, this is about implementation details and can be discussed later.
I think you should at least test reading zstd text file written by Hadoop |
Added a test scenario with a file compressed using Hadoop native ZSTD codec. |
|
thanks, merging to master! |
|
@sandip-db I just found an incompatible issue between zstd-jni and Hadoop native zstd reported by the HBase community. HBASE-27706, not sure if it will affect Spark cases. |
|
@sandip-db I know that, but since this is related to storage, we should be careful if there are incompatible possibilities |
The current Hadoop version in Spark hasn’t been compiled with Zstd because of which ZSTD compressed json, csv, txt and xml files cannot be read. Spark has a built-in `ZStdCompressionCodec`, which is used for compressing shuffled data and other use cases. This PR adds support for ZStandard decompression in file data source readers using the built-in `ZStdCompressionCodec`. It also add support for some non-standard extensions like `.gzip` and `.zstd`. The first commit in this PR is a vanilla copy of Hadoop's `LineRecordReader`. This was needed to add support for non-Hadoop based codec. Reviewers need not review this commit. Following config have been added to enable/disable the imported Hadoop's `LineRecordReader`: - `spark.sql.execution.datasources.hadoopLineRecordReader.enabled`: Setting this SQLConf to false will result in using Hadoop's `LineRecordReader`. Otherwise, use the inlined `HadoopLineRecordReader` by default. Same as above. Yes, Adds support for ZSTD decompression in file data source readers. Added new unit tests No Closes apache#51182 from sandip-db/zstd-file-source-support. Authored-by: Sandip Agarwala <131817656+sandip-db@users.noreply.github.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
|
@sandip-db is there any plan for future work to allow for this read pattern?
I'm not seeing any other currently supported way to achieve such a result. |
What changes were proposed in this pull request?
The current Hadoop version in Spark hasn’t been compiled with Zstd because of which ZSTD compressed json, csv, txt and xml files cannot be read. Spark has a built-in
ZStdCompressionCodec, which is used for compressing shuffled data and other use cases.This PR adds support for ZStandard decompression in file data source readers using the built-in
ZStdCompressionCodec. It also add support for some non-standard extensions like.gzipand.zstd.The first commit in this PR is a vanilla copy of Hadoop's
LineRecordReader. This was needed to add support for non-Hadoop based codec. Reviewers need not review this commit.Following config have been added to enable/disable the imported Hadoop's
LineRecordReader:spark.sql.execution.datasources.hadoopLineRecordReader.enabled: Setting this SQLConf to false will result in using Hadoop'sLineRecordReader. Otherwise, use the inlinedHadoopLineRecordReaderby default.Why are the changes needed?
Same as above.
Does this PR introduce any user-facing change?
Yes, Adds support for ZSTD decompression in file data source readers.
How was this patch tested?
Added new unit tests
Was this patch authored or co-authored using generative AI tooling?
No