diff --git a/.claude/sweep-performance-state.csv b/.claude/sweep-performance-state.csv index 84b8a8ab7..c001a51d3 100644 --- a/.claude/sweep-performance-state.csv +++ b/.claude/sweep-performance-state.csv @@ -18,12 +18,12 @@ fire,2026-03-31T18:00:00Z,SAFE,compute-bound,0,, flood,2026-03-31T18:00:00Z,SAFE,compute-bound,0,, focal,2026-05-29,SAFE,compute-bound,1,2734,"HIGH: _hotspots_dask_cupy chunk fn round-tripped each chunk host<->GPU (cupy.asnumpy classify cupy.asarray); fixed PR 2739 to reuse _run_gpu_hotspots on device. LOW (not fixed): _apply_numpy/_hotspots_cupy use zeros_like where empty would suffice. CUDA kernels regs<=62, no register-pressure issue." geodesic,2026-03-31T18:00:00Z,N/A,compute-bound,0,, -geotiff,2026-05-20,SAFE,IO-bound,0,2212,"Pass 13 (2026-05-20): 1 MEDIUM found and fixed. _nvjpeg_batch_encode (_gpu_decode.py:~L1560) and _nvjpeg2k_batch_encode (~L2958) called cupy.cuda.Device().synchronize() inside the per-tile encode loops, a whole-device fence that blocked every CUDA stream and serialised concurrent work (e.g. predictor encodes on other streams). The decode-side counterpart _try_nvjpeg_batch_decode already used cupy.cuda.Stream.null.synchronize() at L1442; the encoder side was inconsistent. Filed #2212 and fixed both encoders to use Stream.null.synchronize(), scoping the per-tile sync to the default stream the encode/retrieve calls were issued on. nvJPEG / nvJPEG2000 encoders maintain a single shared state per encoder so encodes within a batch are inherently serial; the fix removes the device-wide blocker without changing the API ordering contract. 5 new tests in test_nvjpeg_encode_stream_sync_2212.py (AST checks that neither encoder contains Device().synchronize() inside a for-loop, that both call Stream.null.synchronize() in the loop, and that the decoder reference pattern stays pinned). All 5 new tests + 19 existing related encode/decode tests pass. nvjpeg/nvjpeg2k shared libs not present on this host so end-to-end encode verification is gated; add cuda-unavailable-libs note to re-validate on a host with the RAPIDS conda env. SAFE/IO-bound verdict holds; no change in dask graph cost. Dask probe: 2560x2560 deflate-tiled file via read_geotiff_dask(chunks=256) yields 400 tasks for 100 chunks (4 tasks/chunk), well under the 50K cap. LOW deferred (no fix in this PR): _build_ifd called twice per IFD level in _assemble_standard_layout (_writer.py:1531+1543), _assemble_cog_layout (1582+1625), and the COG overview path (2519+2546+2740) -- the first call's bytes are discarded; only the overflow byte length is used to compute pixel_data_offset. Cost is bounded by IFD count (typically 1-5 overview levels) so absolute impact is minor. Pre-existing pattern. | Pass 12 (2026-05-18): 1 MEDIUM found and fixed. _try_nvjpeg2k_batch_decode at _gpu_decode.py:~L2725-2778 allocated per-tile per-component cupy.empty buffers (N*S round-trips through the cupy memory pool) and called cupy.cuda.Device().synchronize() once per tile, forcing default-stream serialisation that defeats nvJPEG2000's internal pipelining. Filed #2107 and fixed: pre-allocate a single d_comp_pool sized n_tiles*samples*tile_height*pitch under a _check_gpu_memory guard, derive per-tile/per-component views as slab offsets, and replace the per-tile sync with a single batch-end sync. Same pattern as #1659 (_try_nvcomp_from_device_bufs), #1688 (_try_kvikio_read_tiles), #1712 (_nvcomp_batch_compress). 7 new tests in test_nvjpeg2k_single_alloc_2107.py: AST-level structural assertions confirm no cupy.empty inside the for-loop and no Device().synchronize() inside the loop, plus pool/per_tile_comp_bytes presence and _check_gpu_memory guard checks; lib-absent short-circuit; unsupported-dtype cleanup contract; cupy-only pool slab-non-overlap test (gpu-marked). libnvjpeg2k.so not present on this host so the end-to-end nvJPEG2000 decode is gated -- note added to re-validate on a host with the RAPIDS conda env. All 30 jpeg2000/compression tests + 7 new tests pass. SAFE/IO-bound verdict holds (no change in dask graph cost). Dask probe: 4096x4096 deflate-tiled file via read_geotiff_dask(chunks=512) yields 256 tasks for 64 chunks (4 tasks/chunk), well under the 50K cap. | Pass 11 (2026-05-18): 1 MEDIUM found and fixed. _read_strips (_reader.py:~L1972) and _fetch_decode_cog_http_strips (_reader.py:~L2670) decoded strips sequentially in a Python for-loop while the tile counterparts (_read_tiles L2146, _fetch_decode_cog_http_tiles L2898) gated parallel decode on _PARALLEL_DECODE_PIXEL_THRESHOLD via ThreadPoolExecutor. Filed #2100 and fixed: both strip paths now collect jobs, parallel-decode when n_strips > 1 and strip_pixels >= 64K, then place sequentially. Measured (uint16, 4-core): 4096x4096 deflate 130ms->34ms (3.82x), 8192x8192 deflate 531ms->146ms (3.63x), 8192x8192 zstd 211ms->85ms (2.48x), uncompressed 25ms->22ms (1.14x). 5 new tests in test_parallel_strip_decode_2100.py (parallel/serial parity, pool-engaged on multi-strip, serial-path for single-strip, windowed cross-strip read, HTTP COG strip parity). 3998 tests pass; 8 pre-existing failures predating this change (predictor2 BE + size_param_validation_gpu_vrt reference now-private read_to_array attr). SAFE/IO-bound verdict holds. | Pass 10 (2026-05-15): 1 new MEDIUM found and fixed; 2 LOW noted. MEDIUM (_reader.py:2737): _fetch_decode_cog_http_tiles decoded tiles sequentially in a Python for-loop after the concurrent fetch landed (issue #1480). Local _read_tiles parallelises decode whenever tile_pixels >= 64K via ThreadPoolExecutor (_reader.py:2017); the HTTP path was structurally similar but never picked up the same gate, so wide windowed reads of multi-tile COGs left deflate/zstd decode single-threaded. Mirrored the local-path threshold + pool. 5 new tests in test_cog_http_parallel_decode_2026_05_15.py (parallel + serial round-trip correctness, pool-instantiation branch selection above the threshold, single-tile path skips the pool, structural _decode_strip_or_tile call count == n_tiles). All 262 COG/HTTP tests pass; 3162 of 3164 selected geotiff tests pass overall (2 pre-existing failures predating Pass 9 per prior notes -- test_predictor2_big_endian_gpu_1517 references the now-private read_to_array attr, and the test_size_param_validation_gpu_vrt_1776 tile_size=4 validator failure). LOW deferred (no fix in this PR): (1) _block_reduce_2d_gpu (_gpu_decode.py:3142/3163/3189) does bool(mask.any().item()) per overview level when nodata is set, paying one device sync per level; the alternative (unconditional cupy.putmask) always pays the work cost and the short-circuit is correct under the current API. (2) _nvcomp_batch_compress adler32 staging (_gpu_decode.py:2543-2546) issues n_tiles slice-assign kernels into a fresh contig buffer despite all callers passing slices of a single underlying d_tile_buf; an API refactor to accept the source buffer directly would skip the rebuild. SAFE/IO-bound verdict holds. Dask probe: 2560x2560 chunks=256 yields 400 tasks (4 per chunk), well under the 50000 cap. GPU probe: 1024x1024 float32 zstd read returns CuPy-backed in 236 ms with no host round-trip. | Rockout 2026-05-15: LOW filed #1934 -- _apply_nodata_mask_gpu used cupy.where (allocating); switched to cupy.putmask on the already-owned buffer (float path) and on the post-astype float64 buffer (int path). Saves one chunk-sized device allocation per call. 7 new tests in test_apply_nodata_mask_gpu_inplace_1934.py; 52 related nodata tests pass. | Pass 8 (2026-05-12): 1 new MEDIUM found and fixed. _assemble_standard_layout/_assemble_cog_layout returned bytes(bytearray), doubling peak memory transiently during eager writes. Filed #1756, fixed by returning the bytearray directly. Measured: 95 MB uint8 raster peak drops 202 MB -> 107 MB. _write_bytes / parse_header already accepted the buffer protocol so the change is transparent to callers. 6 new tests in test_assemble_layout_no_bytes_copy_1756.py. 2123 existing geotiff tests pass; the 10 unrelated failures (test_no_georef_windowed_coords_1710, test_predictor2_big_endian_gpu_1517) reference the now-private read_to_array attribute (commit 8adb749, issue #1708) and predate this change. SAFE/IO-bound verdict holds. | Pass 7 (2026-05-12): re-audit identified 4 MEDIUM findings, all real, all backed by microbenches. (1) unpack_bits sub-byte loops for bps=2/4/12 in _compression.py:836-878 were 100-200x slower than vectorised numpy (filed #1713, fixed in this branch: bps=4 2M pixels drops from 165ms to 3ms = 55x; bps=2/12 similar). (2) _write_vrt_tiled at __init__.py:1708 uses scheduler='synchronous' on independent tile writes; measured 33% slowdown on 256-tile zstd write vs threads scheduler (filed #1714, no fix yet). (3) _nvcomp_batch_compress at _gpu_decode.py:2522-2526 still does per-tile cupy.get().tobytes() despite #1552 / #1659 fixing the same pattern elsewhere; measured 45% reduction with concat+single get on n=1024 (filed #1712, no fix yet). (4) _nvcomp_batch_compress at _gpu_decode.py:2457 uses per-tile cupy.empty allocations; 1024 tiles 16KB drops from 4.7ms to 1.0ms with single contiguous + views (bundled into #1712). Cat 6 OOM verdict: SAFE/IO-bound holds -- read_geotiff_dask caps task count at _MAX_DASK_CHUNKS=50_000 and per-chunk memory is bounded by chunk size. _inflate_tiles_kernel resource usage on Ampere: 67 regs/thread, 2896B local/thread, 8192B shared/block (LZW kernel: 29 regs, 24576B shared) -- register pressure under control; high local memory in inflate is unavoidable (LZ77 state) but only thread 0 in each block uses it. | Pass 4 (2026-05-10): re-audit after #1559 (centralise attrs across all read backends). New _populate_attrs_from_geo_info helper at __init__.py:301 runs once per read, not per-chunk -- no perf impact. Probe: 2560x2560 deflate-tiled file opened via read_geotiff_dask yields 400 tasks (4 tasks/chunk for 100 chunks), well under 1M cap. read_geotiff_gpu(1024x1024) returns cupy.ndarray end-to-end with no host round-trip (226ms incl. write+decode). No new HIGH/MEDIUM findings. SAFE/IO-bound holds. | Pass 3 (2026-05-10): SAFE/IO-bound. Audited 4 perf commits: #1558 (in-place NaN writes on uniquely-owned buffers correct), #1556 (fp-predictor ngjit ~297us/tile for 256x256 float32), #1552 (single cupy.concatenate + one .get() for batched D2H at _gpu_decode.py:870-913), #1551 (parallel decode threshold >=65536px engages 256x256 default at _reader.py:1121). Bench: 8192x8192 f32 deflate+pred2 256-tile write 782ms; 4096x4096 f32 deflate read 83ms with parallel decode. Deferred LOW (none filed, all <10% MEDIUM threshold): _writer.py:459/1109 redundant .copy() before predictor encode (~1% per tile), _compression.py:280 lzw_decompress dst[:n].copy() (~2% per LZW tile decode), _writer.py:1419 seg_np.copy() before in-place NaN substitution (negligible, conditional path), _CloudSource.read_range opens fresh fsspec handle per range (pre-existing, predates audit scope). nvCOMP per-tile D2H batching break-even confirmed (variable sizes need staging buffer, no win). | Pass 3 (2026-05-10): audited f157746,39322c3,f23ec8f,1aac3b7. All 5 commits correct. Redundant .copy() in _writer.py:459,1109 and _compression.py:280 (1-2% overhead, LOW). _CloudSource.read_range() per-call open is pre-existing arch issue. No HIGH/MEDIUM regressions. SAFE. | re-audit 2026-05-02: 6 commits since 2026-04-16 (predictor=3 CPU encode/decode, GPU predictor stride fix, validate_tile_layout, BigTIFF LONG8 offsets, AREA_OR_POINT VRT, per-tile alloc guard). 1M dask chunk cap intact at __init__.py:948; adler32 batch transfer intact at _gpu_decode.py:1825. New code is metadata validation and dispatcher logic with no extra materialization or per-tile sync points. No HIGH/MEDIUM regressions. | Pass 5 (2026-05-12): re-audit identified MEDIUM in _gpu_decode.py:1577 _try_nvcomp_from_device_bufs: per-tile cupy.empty + trailing cupy.concatenate doubled peak VRAM and added serial concat. Filed #1659 and fixed to single-buffer + pointer offsets (matches LZW/deflate/host-buffer patterns at L1847/L1878/L1114). Microbench (alloc+concat overhead only, not full nvCOMP latency): n=256 tile_bytes=65536 drops 3.66ms->0.69ms, n=256 tile_bytes=262144 drops 8.18ms->0.13ms. Tests: 5 new tests in test_nvcomp_from_device_bufs_single_alloc_1659.py (codec short-circuit, no-lib short-circuit, memory-guard contract, real ZSTD round-trip via nvCOMP, structural single-buffer check). 1458 existing geotiff tests pass, 3 unrelated matplotlib/py3.14 failures pre-existing. SAFE/IO-bound verdict holds. | Pass 6 (2026-05-12): re-audit on top of #1659. New HIGH in _try_kvikio_read_tiles at _gpu_decode.py:941: per-tile cupy.empty() + blocking IOFuture.get() inside loop serialised GDS reads to ~1 outstanding pread, missed parallelism the kvikio worker pool was designed for, paid per-tile cupy.empty setup (matches #1659 anti-pattern in nvCOMP path), and lacked _check_gpu_memory guard. Filed #1688 and fixed to single contiguous buffer + batched submit + guard. Microbench with 8-worker pool simulation: 256 tiles@1ms latency drops 256ms->38.7ms (~6.6x); single-thread simulation 256ms->28.5ms (9x). Tests: 9 new tests in test_kvikio_batched_pread_1688.py (kvikio-absent path, single-buffer pointer arithmetic, submit-before-get ordering, memory guard, partial-read fallback, round-trip data, zero-size/all-sparse tiles). All 1577 geotiff tests pass except pre-existing matplotlib/py3.14 failures." +geotiff,2026-06-09,SAFE,IO-bound,0,3117,"Pass 14 (2026-06-09): MEDIUM found and fixed -- _write_streaming ran one dask .compute() per 256-row tile-row/strip, so a source chunk taller than the band re-executed once per band it overlapped (measured 2x at chunks=512, 4x at chunks=1024, whole upstream graph re-runs for computed pipelines). Filed #3117, fixed via _stream_row_bands: consecutive tile-rows/strips group into row bands sized by the source chunk-row span (one-chunk halo, #3007 accounting) under streaming_buffer_bytes; each band computes once and tiles/strips are carved from the materialised band. Wide rasters needing column segmentation keep the per-tile-row path. Post-fix per-chunk executions == 1 on the default read->write round trip. 5 new tests (TestRowBandRecompute3117 + _stream_row_bands unit); write/integration/parity suites pass (2195). LOW deferred (no fix): _read_geotiff_gpu_chunked parses header+all IFDs twice at graph build (_backends/gpu.py ~1367-1419, cap check then GDS probe; build-time only). GPU paths validated on-device this pass: eager gpu read returns cupy with parity, dask+GPU chunked read lazy (17 tasks/4 chunks) with parity; GPU writer full materialisation is documented intentional (streaming_buffer_bytes no-op). Read path keeps 50k-task graph cap; dask read probe 4 tasks/chunk. SAFE/IO-bound holds. | Pass 13 (2026-05-20): 1 MEDIUM found and fixed. _nvjpeg_batch_encode (_gpu_decode.py:~L1560) and _nvjpeg2k_batch_encode (~L2958) called cupy.cuda.Device().synchronize() inside the per-tile encode loops, a whole-device fence that blocked every CUDA stream and serialised concurrent work (e.g. predictor encodes on other streams). The decode-side counterpart _try_nvjpeg_batch_decode already used cupy.cuda.Stream.null.synchronize() at L1442; the encoder side was inconsistent. Filed #2212 and fixed both encoders to use Stream.null.synchronize(), scoping the per-tile sync to the default stream the encode/retrieve calls were issued on. nvJPEG / nvJPEG2000 encoders maintain a single shared state per encoder so encodes within a batch are inherently serial; the fix removes the device-wide blocker without changing the API ordering contract. 5 new tests in test_nvjpeg_encode_stream_sync_2212.py (AST checks that neither encoder contains Device().synchronize() inside a for-loop, that both call Stream.null.synchronize() in the loop, and that the decoder reference pattern stays pinned). All 5 new tests + 19 existing related encode/decode tests pass. nvjpeg/nvjpeg2k shared libs not present on this host so end-to-end encode verification is gated; add cuda-unavailable-libs note to re-validate on a host with the RAPIDS conda env. SAFE/IO-bound verdict holds; no change in dask graph cost. Dask probe: 2560x2560 deflate-tiled file via read_geotiff_dask(chunks=256) yields 400 tasks for 100 chunks (4 tasks/chunk), well under the 50K cap. LOW deferred (no fix in this PR): _build_ifd called twice per IFD level in _assemble_standard_layout (_writer.py:1531+1543), _assemble_cog_layout (1582+1625), and the COG overview path (2519+2546+2740) -- the first call's bytes are discarded; only the overflow byte length is used to compute pixel_data_offset. Cost is bounded by IFD count (typically 1-5 overview levels) so absolute impact is minor. Pre-existing pattern. | Pass 12 (2026-05-18): 1 MEDIUM found and fixed. _try_nvjpeg2k_batch_decode at _gpu_decode.py:~L2725-2778 allocated per-tile per-component cupy.empty buffers (N*S round-trips through the cupy memory pool) and called cupy.cuda.Device().synchronize() once per tile, forcing default-stream serialisation that defeats nvJPEG2000's internal pipelining. Filed #2107 and fixed: pre-allocate a single d_comp_pool sized n_tiles*samples*tile_height*pitch under a _check_gpu_memory guard, derive per-tile/per-component views as slab offsets, and replace the per-tile sync with a single batch-end sync. Same pattern as #1659 (_try_nvcomp_from_device_bufs), #1688 (_try_kvikio_read_tiles), #1712 (_nvcomp_batch_compress). 7 new tests in test_nvjpeg2k_single_alloc_2107.py: AST-level structural assertions confirm no cupy.empty inside the for-loop and no Device().synchronize() inside the loop, plus pool/per_tile_comp_bytes presence and _check_gpu_memory guard checks; lib-absent short-circuit; unsupported-dtype cleanup contract; cupy-only pool slab-non-overlap test (gpu-marked). libnvjpeg2k.so not present on this host so the end-to-end nvJPEG2000 decode is gated -- note added to re-validate on a host with the RAPIDS conda env. All 30 jpeg2000/compression tests + 7 new tests pass. SAFE/IO-bound verdict holds (no change in dask graph cost). Dask probe: 4096x4096 deflate-tiled file via read_geotiff_dask(chunks=512) yields 256 tasks for 64 chunks (4 tasks/chunk), well under the 50K cap. | Pass 11 (2026-05-18): 1 MEDIUM found and fixed. _read_strips (_reader.py:~L1972) and _fetch_decode_cog_http_strips (_reader.py:~L2670) decoded strips sequentially in a Python for-loop while the tile counterparts (_read_tiles L2146, _fetch_decode_cog_http_tiles L2898) gated parallel decode on _PARALLEL_DECODE_PIXEL_THRESHOLD via ThreadPoolExecutor. Filed #2100 and fixed: both strip paths now collect jobs, parallel-decode when n_strips > 1 and strip_pixels >= 64K, then place sequentially. Measured (uint16, 4-core): 4096x4096 deflate 130ms->34ms (3.82x), 8192x8192 deflate 531ms->146ms (3.63x), 8192x8192 zstd 211ms->85ms (2.48x), uncompressed 25ms->22ms (1.14x). 5 new tests in test_parallel_strip_decode_2100.py (parallel/serial parity, pool-engaged on multi-strip, serial-path for single-strip, windowed cross-strip read, HTTP COG strip parity). 3998 tests pass; 8 pre-existing failures predating this change (predictor2 BE + size_param_validation_gpu_vrt reference now-private read_to_array attr). SAFE/IO-bound verdict holds. | Pass 10 (2026-05-15): 1 new MEDIUM found and fixed; 2 LOW noted. MEDIUM (_reader.py:2737): _fetch_decode_cog_http_tiles decoded tiles sequentially in a Python for-loop after the concurrent fetch landed (issue #1480). Local _read_tiles parallelises decode whenever tile_pixels >= 64K via ThreadPoolExecutor (_reader.py:2017); the HTTP path was structurally similar but never picked up the same gate, so wide windowed reads of multi-tile COGs left deflate/zstd decode single-threaded. Mirrored the local-path threshold + pool. 5 new tests in test_cog_http_parallel_decode_2026_05_15.py (parallel + serial round-trip correctness, pool-instantiation branch selection above the threshold, single-tile path skips the pool, structural _decode_strip_or_tile call count == n_tiles). All 262 COG/HTTP tests pass; 3162 of 3164 selected geotiff tests pass overall (2 pre-existing failures predating Pass 9 per prior notes -- test_predictor2_big_endian_gpu_1517 references the now-private read_to_array attr, and the test_size_param_validation_gpu_vrt_1776 tile_size=4 validator failure). LOW deferred (no fix in this PR): (1) _block_reduce_2d_gpu (_gpu_decode.py:3142/3163/3189) does bool(mask.any().item()) per overview level when nodata is set, paying one device sync per level; the alternative (unconditional cupy.putmask) always pays the work cost and the short-circuit is correct under the current API. (2) _nvcomp_batch_compress adler32 staging (_gpu_decode.py:2543-2546) issues n_tiles slice-assign kernels into a fresh contig buffer despite all callers passing slices of a single underlying d_tile_buf; an API refactor to accept the source buffer directly would skip the rebuild. SAFE/IO-bound verdict holds. Dask probe: 2560x2560 chunks=256 yields 400 tasks (4 per chunk), well under the 50000 cap. GPU probe: 1024x1024 float32 zstd read returns CuPy-backed in 236 ms with no host round-trip. | Rockout 2026-05-15: LOW filed #1934 -- _apply_nodata_mask_gpu used cupy.where (allocating); switched to cupy.putmask on the already-owned buffer (float path) and on the post-astype float64 buffer (int path). Saves one chunk-sized device allocation per call. 7 new tests in test_apply_nodata_mask_gpu_inplace_1934.py; 52 related nodata tests pass. | Pass 8 (2026-05-12): 1 new MEDIUM found and fixed. _assemble_standard_layout/_assemble_cog_layout returned bytes(bytearray), doubling peak memory transiently during eager writes. Filed #1756, fixed by returning the bytearray directly. Measured: 95 MB uint8 raster peak drops 202 MB -> 107 MB. _write_bytes / parse_header already accepted the buffer protocol so the change is transparent to callers. 6 new tests in test_assemble_layout_no_bytes_copy_1756.py. 2123 existing geotiff tests pass; the 10 unrelated failures (test_no_georef_windowed_coords_1710, test_predictor2_big_endian_gpu_1517) reference the now-private read_to_array attribute (commit 8adb749, issue #1708) and predate this change. SAFE/IO-bound verdict holds. | Pass 7 (2026-05-12): re-audit identified 4 MEDIUM findings, all real, all backed by microbenches. (1) unpack_bits sub-byte loops for bps=2/4/12 in _compression.py:836-878 were 100-200x slower than vectorised numpy (filed #1713, fixed in this branch: bps=4 2M pixels drops from 165ms to 3ms = 55x; bps=2/12 similar). (2) _write_vrt_tiled at __init__.py:1708 uses scheduler='synchronous' on independent tile writes; measured 33% slowdown on 256-tile zstd write vs threads scheduler (filed #1714, no fix yet). (3) _nvcomp_batch_compress at _gpu_decode.py:2522-2526 still does per-tile cupy.get().tobytes() despite #1552 / #1659 fixing the same pattern elsewhere; measured 45% reduction with concat+single get on n=1024 (filed #1712, no fix yet). (4) _nvcomp_batch_compress at _gpu_decode.py:2457 uses per-tile cupy.empty allocations; 1024 tiles 16KB drops from 4.7ms to 1.0ms with single contiguous + views (bundled into #1712). Cat 6 OOM verdict: SAFE/IO-bound holds -- read_geotiff_dask caps task count at _MAX_DASK_CHUNKS=50_000 and per-chunk memory is bounded by chunk size. _inflate_tiles_kernel resource usage on Ampere: 67 regs/thread, 2896B local/thread, 8192B shared/block (LZW kernel: 29 regs, 24576B shared) -- register pressure under control; high local memory in inflate is unavoidable (LZ77 state) but only thread 0 in each block uses it. | Pass 4 (2026-05-10): re-audit after #1559 (centralise attrs across all read backends). New _populate_attrs_from_geo_info helper at __init__.py:301 runs once per read, not per-chunk -- no perf impact. Probe: 2560x2560 deflate-tiled file opened via read_geotiff_dask yields 400 tasks (4 tasks/chunk for 100 chunks), well under 1M cap. read_geotiff_gpu(1024x1024) returns cupy.ndarray end-to-end with no host round-trip (226ms incl. write+decode). No new HIGH/MEDIUM findings. SAFE/IO-bound holds. | Pass 3 (2026-05-10): SAFE/IO-bound. Audited 4 perf commits: #1558 (in-place NaN writes on uniquely-owned buffers correct), #1556 (fp-predictor ngjit ~297us/tile for 256x256 float32), #1552 (single cupy.concatenate + one .get() for batched D2H at _gpu_decode.py:870-913), #1551 (parallel decode threshold >=65536px engages 256x256 default at _reader.py:1121). Bench: 8192x8192 f32 deflate+pred2 256-tile write 782ms; 4096x4096 f32 deflate read 83ms with parallel decode. Deferred LOW (none filed, all <10% MEDIUM threshold): _writer.py:459/1109 redundant .copy() before predictor encode (~1% per tile), _compression.py:280 lzw_decompress dst[:n].copy() (~2% per LZW tile decode), _writer.py:1419 seg_np.copy() before in-place NaN substitution (negligible, conditional path), _CloudSource.read_range opens fresh fsspec handle per range (pre-existing, predates audit scope). nvCOMP per-tile D2H batching break-even confirmed (variable sizes need staging buffer, no win). | Pass 3 (2026-05-10): audited f157746,39322c3,f23ec8f,1aac3b7. All 5 commits correct. Redundant .copy() in _writer.py:459,1109 and _compression.py:280 (1-2% overhead, LOW). _CloudSource.read_range() per-call open is pre-existing arch issue. No HIGH/MEDIUM regressions. SAFE. | re-audit 2026-05-02: 6 commits since 2026-04-16 (predictor=3 CPU encode/decode, GPU predictor stride fix, validate_tile_layout, BigTIFF LONG8 offsets, AREA_OR_POINT VRT, per-tile alloc guard). 1M dask chunk cap intact at __init__.py:948; adler32 batch transfer intact at _gpu_decode.py:1825. New code is metadata validation and dispatcher logic with no extra materialization or per-tile sync points. No HIGH/MEDIUM regressions. | Pass 5 (2026-05-12): re-audit identified MEDIUM in _gpu_decode.py:1577 _try_nvcomp_from_device_bufs: per-tile cupy.empty + trailing cupy.concatenate doubled peak VRAM and added serial concat. Filed #1659 and fixed to single-buffer + pointer offsets (matches LZW/deflate/host-buffer patterns at L1847/L1878/L1114). Microbench (alloc+concat overhead only, not full nvCOMP latency): n=256 tile_bytes=65536 drops 3.66ms->0.69ms, n=256 tile_bytes=262144 drops 8.18ms->0.13ms. Tests: 5 new tests in test_nvcomp_from_device_bufs_single_alloc_1659.py (codec short-circuit, no-lib short-circuit, memory-guard contract, real ZSTD round-trip via nvCOMP, structural single-buffer check). 1458 existing geotiff tests pass, 3 unrelated matplotlib/py3.14 failures pre-existing. SAFE/IO-bound verdict holds. | Pass 6 (2026-05-12): re-audit on top of #1659. New HIGH in _try_kvikio_read_tiles at _gpu_decode.py:941: per-tile cupy.empty() + blocking IOFuture.get() inside loop serialised GDS reads to ~1 outstanding pread, missed parallelism the kvikio worker pool was designed for, paid per-tile cupy.empty setup (matches #1659 anti-pattern in nvCOMP path), and lacked _check_gpu_memory guard. Filed #1688 and fixed to single contiguous buffer + batched submit + guard. Microbench with 8-worker pool simulation: 256 tiles@1ms latency drops 256ms->38.7ms (~6.6x); single-thread simulation 256ms->28.5ms (9x). Tests: 9 new tests in test_kvikio_batched_pread_1688.py (kvikio-absent path, single-buffer pointer arithmetic, submit-before-get ordering, memory guard, partial-read fallback, round-trip data, zero-size/all-sparse tiles). All 1577 geotiff tests pass except pre-existing matplotlib/py3.14 failures." glcm,2026-03-31T18:00:00Z,SAFE,compute-bound,0,,"Downgraded to MEDIUM. da.stack without rechunk is scheduling overhead, not OOM risk." hillshade,2026-04-16T12:00:00Z,SAFE,compute-bound,0,,"Re-audit after Horn's method rewrite (PR 1175): clean stencil, map_overlap depth=(1,1), no materialization. Zero findings." hydro,2026-05-01,RISKY,memory-bound,0,1416,"Fixed-in-tree 2026-05-01: hand_mfd._hand_mfd_dask now assembles via da.map_blocks instead of eager da.block of pre-computed tiles (matches hand_dinf pattern). Remaining MEDIUM: sink_d8 CCL fully materializes labels (inherently global), flow_accumulation_mfd frac_bdry held in driver dict instead of memmap-backed BoundaryStore. D8 iterative paths (flow_accum/fill/watershed/basin/stream_*) use serial-tile sweep with memmap-backed boundary store -- per-tile RAM bounded but driver iterates O(diameter) times. flow_direction_*, flow_path/snap_pour_point/twi/hand_d8/hand_dinf are SAFE." -interpolate_spline,2026-06-04,SAFE,compute-bound,0,,"scope=spline-only. Audited _spline.py + _validation.py only (not _idw/_kriging). 1 MEDIUM (Cat3 GPU transfer): _spline_dask_cupy/_spline_cupy re-uploaded invariant x_pts/y_pts/weights host->device once per chunk. Fixed in PR #2929: added _tps_evaluate_gpu taking on-device point/weight arrays + only per-chunk grid slices; dask+cupy uploads invariants once at graph build (verified 48->3 on 16 chunks, scales with chunk count). numpy/cupy/dask+cupy parity ~1e-14. Added cupy+dask+cupy parity tests and an upload-count regression test (red without fix: 48!=3). _tps_cuda_kernel 30 regs/thread, 6 scalar locals -- no register pressure. CPU/dask+numpy eval @ngjit, row-major, no materialization. Dask graph probe 2560x2560/256 chunks = 200 tasks (2/chunk), no fan-in. Memory guard _check_spline_memory bounds N^2 solve. No issue filed -- gh issue create denied by auto-mode classifier; finding surfaced directly by sweep. GitHub issue field left empty." interpolate-kriging,2026-06-04,SAFE,graph-bound,0,2923,"MEDIUM: memory guard used full-grid k0 term on dask templates -> spurious MemoryError (issue #2923, fixed). LOW: _experimental_variogram nlags python loop vectorizable via bincount (~1.2x, pair-array materialization dominates) - doc only. Dask graph clean (2 tasks/chunk); cupy returns device arrays; no .values/.compute/.data.get materialization." +interpolate_spline,2026-06-04,SAFE,compute-bound,0,,"scope=spline-only. Audited _spline.py + _validation.py only (not _idw/_kriging). 1 MEDIUM (Cat3 GPU transfer): _spline_dask_cupy/_spline_cupy re-uploaded invariant x_pts/y_pts/weights host->device once per chunk. Fixed in PR #2929: added _tps_evaluate_gpu taking on-device point/weight arrays + only per-chunk grid slices; dask+cupy uploads invariants once at graph build (verified 48->3 on 16 chunks, scales with chunk count). numpy/cupy/dask+cupy parity ~1e-14. Added cupy+dask+cupy parity tests and an upload-count regression test (red without fix: 48!=3). _tps_cuda_kernel 30 regs/thread, 6 scalar locals -- no register pressure. CPU/dask+numpy eval @ngjit, row-major, no materialization. Dask graph probe 2560x2560/256 chunks = 200 tasks (2/chunk), no fan-in. Memory guard _check_spline_memory bounds N^2 solve. No issue filed -- gh issue create denied by auto-mode classifier; finding surfaced directly by sweep. GitHub issue field left empty." kde,2026-04-14T12:00:00Z,SAFE,compute-bound,0,,Graph construction serialized per-tile. _filter_points_to_tile scans all points per tile. No HIGH findings. mahalanobis,2026-03-31T18:00:00Z,SAFE,compute-bound,0,,False positive. Numpy path materializes by design. Dask path uses lazy reductions + map_blocks. morphology,2026-03-31T18:00:00Z,SAFE,compute-bound,0,, diff --git a/xrspatial/geotiff/_writer.py b/xrspatial/geotiff/_writer.py index d493e6458..5808c96db 100644 --- a/xrspatial/geotiff/_writer.py +++ b/xrspatial/geotiff/_writer.py @@ -707,6 +707,56 @@ def _max_streaming_row_span(row_chunks, tile_h, height): return worst +def _stream_row_bands(row_chunks, band_h, height, max_src_rows): + """Group ``band_h``-tall output bands into per-compute row bands. + + Each ``.compute()`` of an output band materialises every source + chunk-row it touches (plus a one-chunk ``map_overlap`` halo each + side) in full, so computing one ``band_h``-tall band at a time + re-executes a source chunk once per band it overlaps (issue #3117). + Grouping consecutive bands until the touched source-row span (with + halo) would exceed ``max_src_rows`` lets the caller compute each + group once and carve the individual tile-rows / strips out of the + materialised array, for the same peak memory. + + Returns a list of ``(start_row, stop_row)`` pairs covering + ``[0, height)``. Every group contains at least one band, so a + single band whose span already exceeds ``max_src_rows`` degrades to + the previous one-band-per-compute behaviour (the budget stays a + soft cap, matching #3007). ``row_chunks`` is the source's row-chunk + tuple, or ``None`` / empty when unknown, in which case no grouping + is attempted. + """ + import bisect + if not row_chunks: + return [(r0, min(r0 + band_h, height)) + for r0 in range(0, height, band_h)] + offsets = [0] + for h in row_chunks: + offsets.append(offsets[-1] + int(h)) + n = len(row_chunks) + + def _span(r0, r1): + first = bisect.bisect_right(offsets, r0) - 1 + last = bisect.bisect_right(offsets, r1 - 1) - 1 + lo = max(0, first - 1) + hi = min(n - 1, last + 1) + return offsets[hi + 1] - offsets[lo] + + bands = [] + r0 = 0 + while r0 < height: + r1 = min(r0 + band_h, height) + while r1 < height: + r1_next = min(r1 + band_h, height) + if _span(r0, r1_next) > max_src_rows: + break + r1 = r1_next + bands.append((r0, r1)) + r0 = r1 + return bands + + def _write_streaming(dask_data, path: str, *, geo_transform: 'GeoTransform | None' = None, crs_epsg: int | None = None, @@ -734,9 +784,12 @@ def _write_streaming(dask_data, path: str, *, For tiled output, each tile-row is computed in horizontal segments that fit within ``streaming_buffer_bytes``. Most rasters fit in a - single segment per tile-row, matching the previous behaviour. Wide - rasters get bounded peak memory at the cost of more dask compute - calls. + single segment per tile-row; in that case consecutive tile-rows are + additionally grouped into row bands sized by the source chunk-row + span and computed once per band, so a source chunk taller than the + tile is not re-executed once per tile-row it overlaps (issue + #3117). Wide rasters get bounded peak memory at the cost of more + dask compute calls. For tiled output the horizontal-segment budget is sized from the source chunk geometry rather than the output tile height: a @@ -746,9 +799,9 @@ def _write_streaming(dask_data, path: str, *, past the cap (#3007). ``streaming_buffer_bytes`` stays a soft cap -- the column halo of a 2D overlap adds a bounded couple of source chunk-columns on top. Strip output (``tiled=False``) does no - horizontal segmentation; its peak is - ``rows_per_strip * width * bytes_per_sample * samples`` (plus any - overlap halo). + horizontal segmentation, but groups strips into the same row bands; + its per-compute peak is the banded source-row span under the budget + (plus any overlap halo), never less than one strip. After all pixel data is written the IFD offset and byte-count arrays are patched in place. @@ -1093,6 +1146,31 @@ def _write_streaming(dask_data, path: str, *, tiles_per_segment = max( 1, streaming_buffer_bytes // bytes_per_tile_col) + # Row banding (issue #3117): when one full-width + # tile-row fits the budget (no column segmentation), + # group consecutive tile-rows into bands sized by the + # source chunk-row span and compute each band once. + # Each per-tile-row compute already materialises every + # touched source chunk-row in full, so carving several + # tile-rows out of one materialised band costs the same + # peak memory while dropping the per-chunk recompute + # (amplification was chunk_height / tile_height). + # Segmented wide-raster writes keep the + # one-tile-row-per-compute path; ``_stream_row_bands`` + # with ``row_chunks=None`` degrades to exactly that. + if tiles_per_segment == tiles_across and row_chunks: + bytes_per_src_row = width * bytes_per_sample * samples + max_src_rows = max( + 1, + streaming_buffer_bytes // max(1, bytes_per_src_row)) + try: + row_bands = _stream_row_bands( + row_chunks[0], th, height, max_src_rows) + except (TypeError, ValueError): + row_bands = _stream_row_bands(None, th, height, 0) + else: + row_bands = _stream_row_bands(None, th, height, 0) + # Hoist the compression thread pool over the entire tiled # write. Re-creating the executor per segment paid the # thread-startup cost on every horizontal stripe and @@ -1121,107 +1199,156 @@ def _write_streaming(dask_data, path: str, *, # ``shutdown`` after the loop completed and leaked # worker threads on any mid-stream raise. try: - for tr in range(tiles_down): - r0 = tr * th - r1 = min(r0 + th, height) - actual_h = r1 - r0 - - for seg_start in range(0, tiles_across, tiles_per_segment): - seg_end = min(seg_start + tiles_per_segment, - tiles_across) - seg_c0 = seg_start * tw - seg_c1 = min(seg_end * tw, width) - - # Compute just this horizontal segment + for band_r0, band_r1 in row_bands: + # One compute per band when the full row width + # fits the budget (issue #3117). ``band_np`` + # stays None on the segmented wide-raster path, + # whose bands are single tile-rows. + band_np = None + if tiles_per_segment == tiles_across: if dask_data.ndim == 3: - seg_np = np.asarray( - dask_data[r0:r1, seg_c0:seg_c1, :].compute()) + band_np = np.asarray( + dask_data[band_r0:band_r1, :, :] + .compute()) else: - seg_np = np.asarray( - dask_data[r0:r1, seg_c0:seg_c1].compute()) - if hasattr(seg_np, 'get'): - seg_np = seg_np.get() - - if seg_np.dtype != out_dtype: - seg_np = seg_np.astype(out_dtype) - - # NaN -> nodata sentinel - if (nodata is not None and seg_np.dtype.kind == 'f' + band_np = np.asarray( + dask_data[band_r0:band_r1, :].compute()) + if hasattr(band_np, 'get'): + band_np = band_np.get() + + if band_np.dtype != out_dtype: + band_np = band_np.astype(out_dtype) + + # NaN -> nodata sentinel. The copy doubles + # the band transiently on the NaN-present + # path, so the worst case is 2x the soft + # cap -- same factor as the astype above + # and as the old per-tile-row copy. + if (nodata is not None + and band_np.dtype.kind == 'f' and not np.isnan(nodata) and restore_sentinel): - nan_mask = np.isnan(seg_np) + nan_mask = np.isnan(band_np) if nan_mask.any(): - seg_np = seg_np.copy() - seg_np[nan_mask] = seg_np.dtype.type(nodata) - - # Build tile arrays for this segment - seg_tile_arrs = [] - for tc in range(seg_start, seg_end): - c0 = tc * tw - c1 = min(c0 + tw, width) - actual_w = c1 - c0 - - local_c0 = c0 - seg_c0 - local_c1 = c1 - seg_c0 - tile_slice = seg_np[:, local_c0:local_c1] - - if actual_h < th or actual_w < tw: - if seg_np.ndim == 3: - padded = np.zeros((th, tw, samples), - dtype=out_dtype) + band_np = band_np.copy() + band_np[nan_mask] = ( + band_np.dtype.type(nodata)) + + for r0 in range(band_r0, band_r1, th): + r1 = min(r0 + th, band_r1) + actual_h = r1 - r0 + + for seg_start in range(0, tiles_across, + tiles_per_segment): + seg_end = min(seg_start + tiles_per_segment, + tiles_across) + seg_c0 = seg_start * tw + seg_c1 = min(seg_end * tw, width) + + if band_np is not None: + # Carve this tile-row out of the + # already-materialised band (full + # width: seg_c0 == 0). + seg_np = band_np[ + r0 - band_r0:r1 - band_r0] + else: + # Compute just this horizontal + # segment + if dask_data.ndim == 3: + seg_np = np.asarray( + dask_data[r0:r1, + seg_c0:seg_c1, :] + .compute()) + else: + seg_np = np.asarray( + dask_data[r0:r1, + seg_c0:seg_c1] + .compute()) + if hasattr(seg_np, 'get'): + seg_np = seg_np.get() + + if seg_np.dtype != out_dtype: + seg_np = seg_np.astype(out_dtype) + + # NaN -> nodata sentinel + if (nodata is not None + and seg_np.dtype.kind == 'f' + and not np.isnan(nodata) + and restore_sentinel): + nan_mask = np.isnan(seg_np) + if nan_mask.any(): + seg_np = seg_np.copy() + seg_np[nan_mask] = ( + seg_np.dtype.type(nodata)) + + # Build tile arrays for this segment + seg_tile_arrs = [] + for tc in range(seg_start, seg_end): + c0 = tc * tw + c1 = min(c0 + tw, width) + actual_w = c1 - c0 + + local_c0 = c0 - seg_c0 + local_c1 = c1 - seg_c0 + tile_slice = seg_np[:, local_c0:local_c1] + + if actual_h < th or actual_w < tw: + if seg_np.ndim == 3: + padded = np.zeros((th, tw, samples), + dtype=out_dtype) + else: + padded = np.zeros((th, tw), dtype=out_dtype) + padded[:actual_h, :actual_w] = tile_slice + tile_arr = padded else: - padded = np.zeros((th, tw), dtype=out_dtype) - padded[:actual_h, :actual_w] = tile_slice - tile_arr = padded + tile_arr = np.ascontiguousarray(tile_slice) + + seg_tile_arrs.append(tile_arr) + + # Parallel compress on the hoisted ``tile_pool`` + # when it exists. zlib/zstd/LZW release the GIL, + # so threading actually parallelises the C-level + # work. Peak memory while the segment is in + # flight covers BOTH the uncompressed + # ``seg_tile_arrs`` (one full tile per column, + # released after the futures resolve) AND the + # compressed buffers ``seg_compressed`` (held + # until the sequential write loop drains them). + # Both lists are bounded by ``tiles_per_segment`` + # which the streaming buffer cap sets; fall + # through to a serial path when the pool is None + # (no compression / single core) or when only + # one tile sits in this segment. + n_seg_tiles = len(seg_tile_arrs) + if tile_pool is None or n_seg_tiles <= 1: + seg_compressed = [ + _compress_block( + ta, tw, th, samples, out_dtype, + bytes_per_sample, pred_int, comp_tag, + compression_level, max_z_error) + for ta in seg_tile_arrs + ] else: - tile_arr = np.ascontiguousarray(tile_slice) - - seg_tile_arrs.append(tile_arr) - - # Parallel compress on the hoisted ``tile_pool`` - # when it exists. zlib/zstd/LZW release the GIL, - # so threading actually parallelises the C-level - # work. Peak memory while the segment is in - # flight covers BOTH the uncompressed - # ``seg_tile_arrs`` (one full tile per column, - # released after the futures resolve) AND the - # compressed buffers ``seg_compressed`` (held - # until the sequential write loop drains them). - # Both lists are bounded by ``tiles_per_segment`` - # which the streaming buffer cap sets; fall - # through to a serial path when the pool is None - # (no compression / single core) or when only - # one tile sits in this segment. - n_seg_tiles = len(seg_tile_arrs) - if tile_pool is None or n_seg_tiles <= 1: - seg_compressed = [ - _compress_block( - ta, tw, th, samples, out_dtype, - bytes_per_sample, pred_int, comp_tag, - compression_level, max_z_error) - for ta in seg_tile_arrs - ] - else: - futures = [ - tile_pool.submit( - _compress_block, - ta, tw, th, samples, out_dtype, - bytes_per_sample, pred_int, comp_tag, - compression_level, max_z_error, - True) - for ta in seg_tile_arrs - ] - seg_compressed = [ - fut.result() for fut in futures] - - # Sequential file write to preserve on-disk tile order - for compressed in seg_compressed: - actual_offsets.append(current_offset) - actual_counts.append(len(compressed)) - f.write(compressed) - current_offset += len(compressed) - - del seg_np, seg_tile_arrs, seg_compressed + futures = [ + tile_pool.submit( + _compress_block, + ta, tw, th, samples, out_dtype, + bytes_per_sample, pred_int, comp_tag, + compression_level, max_z_error, + True) + for ta in seg_tile_arrs + ] + seg_compressed = [ + fut.result() for fut in futures] + + # Sequential file write to preserve on-disk tile order + for compressed in seg_compressed: + actual_offsets.append(current_offset) + actual_counts.append(len(compressed)) + f.write(compressed) + current_offset += len(compressed) + + del seg_np, seg_tile_arrs, seg_compressed finally: # ``cancel_futures=True`` (Python 3.9+) drops any # queued-but-not-started compress jobs on the @@ -1232,43 +1359,63 @@ def _write_streaming(dask_data, path: str, *, if tile_pool is not None: tile_pool.shutdown(wait=True, cancel_futures=True) else: - # Strip layout - for i in range(n_entries): - r0 = i * rows_per_strip - r1 = min(r0 + rows_per_strip, height) - strip_rows = r1 - r0 - + # Strip layout. Group strips into row bands so a source + # chunk taller than ``rows_per_strip`` is not recomputed + # once per strip it overlaps (issue #3117); each band is + # one compute, sized by the source chunk-row span under + # the ``streaming_buffer_bytes`` budget. + bytes_per_src_row = width * bytes_per_sample * samples + max_src_rows = max( + 1, streaming_buffer_bytes // max(1, bytes_per_src_row)) + row_chunks = getattr(dask_data, 'chunks', None) + try: + strip_bands = _stream_row_bands( + row_chunks[0] if row_chunks else None, + rows_per_strip, height, max_src_rows) + except (TypeError, ValueError): + strip_bands = _stream_row_bands( + None, rows_per_strip, height, 0) + + for band_r0, band_r1 in strip_bands: if dask_data.ndim == 3: - strip_np = np.asarray( - dask_data[r0:r1, :, :].compute()) + band_np = np.asarray( + dask_data[band_r0:band_r1, :, :].compute()) else: - strip_np = np.asarray(dask_data[r0:r1, :].compute()) - if hasattr(strip_np, 'get'): - strip_np = strip_np.get() + band_np = np.asarray( + dask_data[band_r0:band_r1, :].compute()) + if hasattr(band_np, 'get'): + band_np = band_np.get() - if strip_np.dtype != out_dtype: - strip_np = strip_np.astype(out_dtype) + if band_np.dtype != out_dtype: + band_np = band_np.astype(out_dtype) - if (nodata is not None and strip_np.dtype.kind == 'f' + if (nodata is not None and band_np.dtype.kind == 'f' and not np.isnan(nodata) and restore_sentinel): - nan_mask = np.isnan(strip_np) + nan_mask = np.isnan(band_np) if nan_mask.any(): - strip_np = strip_np.copy() - strip_np[nan_mask] = strip_np.dtype.type(nodata) + band_np = band_np.copy() + band_np[nan_mask] = band_np.dtype.type(nodata) + + for r0 in range(band_r0, band_r1, rows_per_strip): + r1 = min(r0 + rows_per_strip, band_r1) + strip_rows = r1 - r0 + strip_np = band_np[r0 - band_r0:r1 - band_r0] + + compressed = _compress_block( + np.ascontiguousarray(strip_np), + width, strip_rows, samples, out_dtype, + bytes_per_sample, pred_int, comp_tag, + compression_level, max_z_error) - compressed = _compress_block( - np.ascontiguousarray(strip_np), - width, strip_rows, samples, out_dtype, - bytes_per_sample, pred_int, comp_tag, - compression_level, max_z_error) + actual_offsets.append(current_offset) + actual_counts.append(len(compressed)) + f.write(compressed) + current_offset += len(compressed) - actual_offsets.append(current_offset) - actual_counts.append(len(compressed)) - f.write(compressed) - current_offset += len(compressed) + del strip_np - del strip_np + del band_np # -- Pass 2: patch IFD with actual offsets. Reuse the type # chosen at tag-build time (LONG for classic, LONG8 for diff --git a/xrspatial/geotiff/tests/write/test_streaming.py b/xrspatial/geotiff/tests/write/test_streaming.py index 85ca09171..43d3cefe8 100644 --- a/xrspatial/geotiff/tests/write/test_streaming.py +++ b/xrspatial/geotiff/tests/write/test_streaming.py @@ -22,6 +22,7 @@ import os import threading import time +from collections import Counter from concurrent.futures import ThreadPoolExecutor import numpy as np @@ -648,6 +649,195 @@ def test_overlap_source_roundtrip_small_buffer_3007(self, tmp_path): rtol=1e-5, atol=1e-5, equal_nan=True) +class TestRowBandRecompute3117: + """Source chunks must be computed once, not once per tile-row. + + Pre-fix, each 256-row tile-row / strip was one ``.compute()`` call, + so a source chunk taller than 256 rows was re-executed once per + band it overlapped (2x at chunks=512, 4x at chunks=1024). The row + banding groups tile-rows / strips per compute under the + ``streaming_buffer_bytes`` budget (#3117). + """ + + @staticmethod + def _count_chunk_executions(dask_da): + """Wrap a dask-backed DataArray so each chunk execution is counted.""" + counts: Counter = Counter() + lock = threading.Lock() + + def record(block, block_info=None): + loc = (tuple(block_info[0]['chunk-location']) + if block_info else None) + with lock: + counts[loc] += 1 + return block + + wrapped = dask_da.copy( + data=dask_da.data.map_blocks(record, dtype=dask_da.dtype)) + return wrapped, counts + + @staticmethod + def _real_counts(counts): + # ``block_info`` is None during dask's meta-inference call; + # drop that key so only real chunk executions are compared. + return {k: v for k, v in counts.items() if k is not None} + + def test_tiled_chunks_taller_than_tile(self, tmp_path): + """1024-tall chunks + 256 tiles: every chunk computes exactly once.""" + arr = np.arange(1024 * 512, dtype=np.float32).reshape(1024, 512) + da = xr.DataArray(arr, dims=['y', 'x']) + dask_da = da.chunk({'y': 1024, 'x': 512}) + + wrapped, counts = self._count_chunk_executions(dask_da) + path = str(tmp_path / 'tmp_3117_tiled.tif') + to_geotiff(wrapped, path, compression='zstd', tile_size=256) + + real = self._real_counts(counts) + assert real, "instrumented source chunks never executed" + assert set(real.values()) == {1}, ( + f"source chunks recomputed during tiled streaming write: " + f"{sorted(real.values())} executions per chunk (expected all 1)") + + result = open_geotiff(path) + np.testing.assert_array_equal(result.values, arr) + + def test_strip_chunks_taller_than_strip(self, tmp_path): + """Strip layout (rows_per_strip=256) gets the same banding.""" + arr = np.arange(1024 * 512, dtype=np.float32).reshape(1024, 512) + da = xr.DataArray(arr, dims=['y', 'x']) + dask_da = da.chunk({'y': 512, 'x': 512}) + + wrapped, counts = self._count_chunk_executions(dask_da) + path = str(tmp_path / 'tmp_3117_strip.tif') + to_geotiff(wrapped, path, compression='zstd', tiled=False) + + real = self._real_counts(counts) + assert real, "instrumented source chunks never executed" + assert set(real.values()) == {1}, ( + f"source chunks recomputed during strip streaming write: " + f"{sorted(real.values())} executions per chunk (expected all 1)") + + result = open_geotiff(path) + np.testing.assert_array_equal(result.values, arr) + + def test_segmented_wide_raster_unchanged(self, tmp_path): + """Column segmentation still bounds compute size and round-trips. + + With a budget too small for one full-width tile-row, banding is + off and the per-tile-row segmented path runs; recompute is + allowed there, correctness is not negotiable. + """ + rng = np.random.default_rng(3117) + arr = rng.random((512, 4096), dtype=np.float64) + da = xr.DataArray(arr, dims=['y', 'x']) + dask_da = da.chunk({'y': 256, 'x': 1024}) + + path = str(tmp_path / 'tmp_3117_segmented.tif') + to_geotiff(dask_da, path, compression='zstd', + streaming_buffer_bytes=2 * 256 * 256 * 8) + result = open_geotiff(path) + np.testing.assert_array_equal(result.values, arr) + + def test_banded_compute_respects_buffer(self, tmp_path, monkeypatch): + """Banding groups tile-rows but stays under the byte budget. + + Budget sized so banding is on (a full-width tile-row fits) and + several bands are needed (the whole raster does not). Asserts + fewer computes than tile-rows (banding engaged) and that no + single compute materialises more source bytes than the budget + (plain source, no overlap halo, so the bound is strict). + """ + import dask.array as da + + height, width, chunk = 2048, 1024, 256 + npdata = np.arange(height * width, dtype=np.float32).reshape( + height, width) + base = da.from_array(npdata, chunks=(chunk, width)) + + materialized = [] + + def _record(block): + materialized.append(block.nbytes) + return block + + base = base.map_blocks(_record, dtype='float32') + dask_da = xr.DataArray(base, dims=['y', 'x']) + + per_compute = [] + orig_compute = da.Array.compute + + def spy_compute(self, *args, **kwargs): + before = sum(materialized) + result = orig_compute(self, *args, **kwargs) + per_compute.append(sum(materialized) - before) + return result + + monkeypatch.setattr(da.Array, 'compute', spy_compute) + + buf = 4 * 1024 * 1024 # holds ~1024 source rows of float32 + path = str(tmp_path / 'tmp_3117_banded_budget.tif') + to_geotiff(dask_da, path, compression='zstd', tile_size=256, + streaming_buffer_bytes=buf) + + tiles_down = height // 256 + assert 1 < len(per_compute) < tiles_down, ( + f"expected banding to group tile-rows: {len(per_compute)} " + f"computes for {tiles_down} tile-rows") + assert max(per_compute) <= buf, ( + f"banded compute materialised {max(per_compute)} source " + f"bytes, over the {buf} budget") + + result = open_geotiff(path) + np.testing.assert_array_equal(result.values, npdata) + + def test_banded_matches_eager_with_nan_sentinel(self, tmp_path): + """NaN->sentinel restore happens per band; bytes must match eager.""" + arr = np.arange(768 * 300, dtype=np.float32).reshape(768, 300) + arr[10:40, 5:50] = np.nan + arr[600:700, 200:280] = np.nan + da = xr.DataArray(arr, dims=['y', 'x'], + attrs={'nodata': -9999.0}) + dask_da = da.chunk({'y': 512, 'x': 300}) + + eager_path = str(tmp_path / 'tmp_3117_eager.tif') + stream_path = str(tmp_path / 'tmp_3117_stream.tif') + to_geotiff(da, eager_path, compression='deflate') + to_geotiff(dask_da, stream_path, compression='deflate') + + eager = open_geotiff(eager_path, masked=True) + stream = open_geotiff(stream_path, masked=True) + np.testing.assert_array_equal(eager.values, stream.values) + + +def test_stream_row_bands_3117(): + """Band grouping under a source-row budget.""" + from xrspatial.geotiff._writer import _stream_row_bands + + # Two 512-tall chunks, 256-row bands, budget for the full height: + # one group covering everything. + assert _stream_row_bands((512, 512), 256, 1024, 1024) == [(0, 1024)] + + # Budget below the two-band span (one 512 chunk + 512 halo chunk = + # 1024 rows): grouping cannot extend past the first band's span, so + # each 256-row band computes alone (pre-fix behaviour as soft floor). + assert _stream_row_bands((512, 512), 256, 1024, 512) == [ + (0, 256), (256, 512), (512, 768), (768, 1024)] + + # Unknown chunks: no grouping, one band per band_h. + assert _stream_row_bands(None, 256, 600, 10_000) == [ + (0, 256), (256, 512), (512, 600)] + + # Mid-size budget: the halo counts. Chunks of 256, bands of 256; + # span(0, 512) touches chunks 0-1 plus halo chunk 2 = 768 rows. + assert _stream_row_bands((256,) * 4, 256, 1024, 768) == [ + (0, 512), (512, 1024)] + + # Ragged tail rows land in the final band. + bands = _stream_row_bands((512, 100), 256, 612, 10_000) + assert bands[-1][1] == 612 + assert bands[0][0] == 0 + + def test_max_streaming_row_span_3007(): """The row-span helper accounts for the source chunk grid + halo.""" from xrspatial.geotiff._writer import _max_streaming_row_span