HADOOP-1593. [ABFS] Add vectored read support in ABFS driver #8400
HADOOP-1593. [ABFS] Add vectored read support in ABFS driver #8400anmolanmol1234 wants to merge 31 commits intoapache:trunkfrom
Conversation
… HADOOP-15963_poc
… HADOOP-15963_poc
… HADOOP-15963_poc
… HADOOP-15963_poc
… HADOOP-15963_poc
… HADOOP-15963_poc
|
🎊 +1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
============================================================
|
|
🎊 +1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
| * completed in doneReading once actual bytes are known. | ||
| * Short-read safety is enforced there via per-unit coverage check. | ||
| */ | ||
| return true; |
There was a problem hiding this comment.
this is not only the READING_IN_PROGRESS case right- incase the buffer is UNAVAILABLE (queued but not picked up by any thread yet) it would also land here.
There was a problem hiding this comment.
yes, that will also land here, will update the comments. In case of doneReading we validate that it has read the needed data for the vectored case.
| * applied later in doneReading before dispatching completion. | ||
| */ | ||
| long end = existing.getOffset() + ( | ||
| existing.getStatus() == ReadBufferStatus.AVAILABLE |
There was a problem hiding this comment.
So we would wait and attach the buffer even if its not yet picked up by thread yet (unavailable state)?
There was a problem hiding this comment.
doneReading is where the actual handling of vectored reads happens, so if the buffer is already queued it is safe to attach the buffer here and later do the handling in doneReading.
| return System.identityHashCode(range); | ||
| } | ||
| } | ||
| } No newline at end of file |
| @@ -1078,7 +1105,7 @@ public int minSeekForVectorReads() { | |||
| */ | |||
| @Override | |||
| public int maxReadSizeForVectorReads() { | |||
There was a problem hiding this comment.
the method mentions the read size but we're returning the max gap size
There was a problem hiding this comment.
this is the name of the method in the superclass
| buffer.getPath(), r.getOffset(), destOffset, length, left); | ||
|
|
||
| if (left < 0) { | ||
| LOG.error("fanOut: pending bytes went negative possible duplicate write:" |
There was a problem hiding this comment.
nit: non-printable character
| if (end >= unit.getOffset() + unit.getLength()) { | ||
| existing.setBufferType(BufferType.VECTORED); | ||
| existing.addVectoredUnit(unit); | ||
| existing.setAllocator(allocator); |
There was a problem hiding this comment.
could we have multiple readvector calls with overlapping ranges that could reset the allocator here? Or isAlreadyQueued section wont go through for overlapping ranges?
There was a problem hiding this comment.
Overlapping ranges are not allowed in vectored reads. validateAndSortRanges in VectoredReadUtils class takes care of this.
| return bufferManager; | ||
| } | ||
|
|
||
| VectoredReadHandler getVectoredReadHandler() { |
| long bufferEnd = bufferStart + bytesRead; | ||
|
|
||
| /* Iterate over all combined logical units mapped to this buffer */ | ||
| for (CombinedFileRange unit : units) { |
There was a problem hiding this comment.
could the following scenario be possible: while we fan-out here, we have another vectorRead call come in overlapping the ranges shared by this buffer and getting attached to the same buffer as a unit
There was a problem hiding this comment.
Overlapping ranges are not allowed, List<? extends FileRange> sortedRanges = VectoredReadUtils.validateAndSortRanges(
ranges, Optional.of(fileLength)); this takes care of that
| */ | ||
| if (isAlreadyQueued(stream.getETag(), unit.getOffset())) { | ||
| ReadBuffer existing = findQueuedBuffer(stream, unit.getOffset()); | ||
| if (existing != null && existing.getStream().getETag() != null && stream.getETag() |
There was a problem hiding this comment.
same doubt as RBMV1, do we wait for UNAVAILABLE state readbuffers too?
There was a problem hiding this comment.
yes addressed above
| * @param abfsConfiguration the configuration to set for the ReadBufferManagerV2. | ||
| */ | ||
| public static void setReadBufferManagerConfigs(final int readAheadBlockSize, | ||
| public static void setReadBufferManagerConfigs(int readAheadBlockSize, |
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
| return isFirstByteConsumed() && isLastByteConsumed(); | ||
| } | ||
|
|
||
| void addVectoredUnit(CombinedFileRange u) { |
There was a problem hiding this comment.
Java doc for all the newly created methods
| // Allocator used for vectored fan-out; captured at queue time */ | ||
| private IntFunction<ByteBuffer> allocator; | ||
| // Tracks whether fanOut has already been executed | ||
| private final AtomicInteger fanOutDone = new AtomicInteger(0); |
There was a problem hiding this comment.
Would it be better to keep fanOutDone as AtomicBoolean instead of AtomicInteger? We don't have to compare the value in isFanOutDone() in that case.
| ReadBuffer findInList(final Collection<ReadBuffer> buffers, | ||
| final AbfsInputStream stream, long requestedOffset) { | ||
| for (ReadBuffer buffer : buffers) { | ||
| if (buffer.getStream() == stream |
There was a problem hiding this comment.
can buffer be null, if yes it will result into null pointer exception here.
There was a problem hiding this comment.
No buffer can't be null here
|
@steveloughran @mukund-thakur requesting you to kindly review the PR. Thanks |
This PR introduces vectored read support in the Azure Blob File System (ABFS) driver to improve read performance for workloads that issue multiple small, non-contiguous read requests.
Vectored reads enable batching of multiple read ranges into fewer network calls, reducing request overhead and improving throughput—especially beneficial for analytics engines like Spark.
Current ABFS read implementation performs sequential, independent read operations for each requested range. This leads to:
Increased number of network calls
Higher latency for small/random reads
Inefficient utilization of bandwidth
Vectored I/O addresses these issues by coalescing multiple read requests into a single or fewer backend calls.