From 3b4798b64fe47affe40418250393a66efaf66394 Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Thu, 11 Jun 2026 12:59:15 -0700 Subject: [PATCH 1/2] Stream dask input through the GPU writer per tile-row band (#3166) --- xrspatial/geotiff/_writers/eager.py | 33 ++-- xrspatial/geotiff/_writers/gpu.py | 168 ++++++++++++++++----- xrspatial/geotiff/tests/gpu/test_writer.py | 149 +++++++++++++++--- 3 files changed, 276 insertions(+), 74 deletions(-) diff --git a/xrspatial/geotiff/_writers/eager.py b/xrspatial/geotiff/_writers/eager.py index 53bb8c7bc..41860596b 100644 --- a/xrspatial/geotiff/_writers/eager.py +++ b/xrspatial/geotiff/_writers/eager.py @@ -108,13 +108,14 @@ def to_geotiff(data: xr.DataArray | np.ndarray, chunk-rows at once (#3007). COG output (``cog=True``) still materialises because overviews need the full array. - Streaming does NOT apply to the GPU writer. CuPy-backed dask input - (dask+cupy) auto-dispatches to the GPU writer, which materialises - the entire array on device before encoding; - ``streaming_buffer_bytes`` is ignored there and a - ``GeoTIFFFallbackWarning`` is emitted when the materialisation - happens (issue #3166). The same applies when ``gpu=True`` is passed - with any dask-backed input. + Dask input routed to the GPU writer (auto-detected dask+cupy, or + ``gpu=True`` with any dask backing) also streams: each tile-row + band is computed onto the device, compressed, and released before + the next, with the per-compute budget capped by + ``streaming_buffer_bytes`` (issue #3166). The exception is + ``cog=True``, which materialises the full array on device because + overview generation needs it, and emits a + ``GeoTIFFFallbackWarning`` when it does. Automatically dispatches to GPU compression when: - ``gpu=True`` is passed, or @@ -252,20 +253,20 @@ def to_geotiff(data: xr.DataArray | np.ndarray, acceleration; backend parity with the CPU writer is tested for the Tier 1 codec set only. Force GPU compression. None (default) auto-detects CuPy data, including CuPy-backed dask - arrays. Dask-backed input routed to the GPU writer is - materialised in full on device (no streaming; see + arrays. Dask-backed input routed to the GPU writer streams one + tile-row band at a time unless ``cog=True`` (see ``streaming_buffer_bytes``). streaming_buffer_bytes : int [stable] Soft cap on bytes materialised per dask compute call when streaming a dask-backed DataArray. Defaults to 256 MB. Wide rasters whose tile-row exceeds this budget are split into - horizontal segments. Only applies to dask-backed inputs on the - CPU write path; the kwarg is a no-op for numpy / CuPy / COG - paths (the COG path materialises the full array because the - overview pyramid needs it) and for the GPU writer. dask+cupy - input auto-dispatches to the GPU writer, so it materialises in - full on device instead of streaming and emits a - ``GeoTIFFFallbackWarning`` (issue #3166). + horizontal segments on the CPU path. On the GPU path the cap + bounds the device bytes computed per tile-row band, with a + floor of one full-width tile-row (issue #3166). The kwarg is a + no-op for in-memory (numpy / CuPy) input and for COG output, + which materialises the full array because the overview pyramid + needs it; a dask-backed ``cog=True`` GPU write emits a + ``GeoTIFFFallbackWarning`` when it materialises. max_z_error : float [experimental] Per-pixel error budget for LERC compression. ``0.0`` (default) is lossless; larger values let the encoder diff --git a/xrspatial/geotiff/_writers/gpu.py b/xrspatial/geotiff/_writers/gpu.py index 526edc3fc..d2260b26f 100644 --- a/xrspatial/geotiff/_writers/gpu.py +++ b/xrspatial/geotiff/_writers/gpu.py @@ -35,17 +35,18 @@ def _warn_dask_materialised() -> None: """Warn that a dask-backed input is about to be fully materialised. - The GPU writer has no streaming mode: ``to_geotiff``'s dask - streaming contract only applies to the CPU path, and dask+cupy - input auto-dispatches here (issue #3166). Emitted from both - ``.compute()`` sites in ``_write_geotiff_gpu`` so explicit and - auto-detected GPU writes warn the same way. + Non-COG dask input streams through the GPU writer one tile-row + band at a time (issue #3166), so this fires only from the + ``cog=True`` ``.compute()`` sites in ``_write_geotiff_gpu``: + overview generation needs the full-resolution array on device. + Emitted from both the DataArray and positional compute sites so + explicit and auto-detected GPU writes warn the same way. Deliberately does NOT participate in the ``XRSPATIAL_GEOTIFF_STRICT`` promotion that most ``GeoTIFFFallbackWarning`` sites apply: there is no opt-out flag for the materialisation, so promotion would turn every dask+cupy - GPU write into a hard failure. Same shape as the JPEG / + COG write into a hard failure. Same shape as the JPEG / experimental-codec opt-in warnings, which also stay warnings under strict mode. @@ -54,10 +55,10 @@ def _warn_dask_materialised() -> None: has already run by the time the fallback fires. """ warnings.warn( - "Dask-backed input routed to the GPU writer is materialised " - "in full on device before encoding; the GPU writer has no " - "streaming mode and streaming_buffer_bytes is ignored. " - "See issue #3166.", + "Dask-backed input with cog=True is materialised in full on " + "device before encoding: GPU overview generation needs the " + "whole array, so streaming_buffer_bytes is ignored. Pass " + "cog=False to stream per tile-row band. See issue #3166.", GeoTIFFFallbackWarning, stacklevel=3, ) @@ -143,9 +144,11 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray, ---------- data : xr.DataArray (CuPy- or NumPy-backed), cupy.ndarray, or np.ndarray [experimental] 2D or 3D raster. CuPy-backed inputs stay on - device; NumPy/Dask inputs are uploaded via + device; NumPy inputs are uploaded via ``cupy.asarray(np.asarray(data))`` before compression (matches - ``to_geotiff`` parity). + ``to_geotiff`` parity). Dask-backed inputs stream one tile-row + band at a time unless ``cog=True`` (see + ``streaming_buffer_bytes``). path : str or binary file-like [experimental] Output file path or any object with a ``write`` method (e.g. ``io.BytesIO``). ``cog=True`` requires a string @@ -241,13 +244,18 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray, auto-promotes when the estimated file size would exceed the classic-TIFF 4 GB limit. streaming_buffer_bytes : int - [internal-only] Accepted for API parity with ``to_geotiff``. - The GPU writer materialises the entire array on device and has - no streaming concept, so this kwarg is a no-op. Default matches - ``to_geotiff`` (256 MB) so callers passing the same kwargs to - either entry point see the same default and the same type. - Dask-backed inputs emit a ``GeoTIFFFallbackWarning`` when they - are materialised (issue #3166). + [experimental] Soft cap on the bytes materialised on device + per dask compute call when the input is dask-backed and + ``cog=False`` (issue #3166). Default matches ``to_geotiff`` + (256 MB). The writer computes and compresses one tile-row band + at a time, grouping consecutive tile-rows by the source + chunk-row span under this budget the same way the CPU + streaming writer does; the floor is one full-width tile-row, + so the cap is soft, and the tile-extraction buffer adds about + one band-sized allocation on top. In-memory (cupy / numpy) + input ignores the kwarg. ``cog=True`` still materialises + dask input in full on device (overview generation needs the + whole array) and emits a ``GeoTIFFFallbackWarning``. max_z_error : float [internal-only] Per-pixel error budget for LERC compression. The GPU writer does not implement LERC (nvCOMP has no LERC @@ -496,11 +504,7 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray, raise TypeError( f"path must be a str or a binary file-like with a write() " f"method, got {type(path).__name__}") - # streaming_buffer_bytes is intentionally a no-op on the GPU path; - # the kwarg exists for API parity with to_geotiff so callers can pass - # the same kwargs to both entry points without filtering. - del streaming_buffer_bytes - # ``pack`` is likewise a no-op here: ``to_geotiff`` applies the + # ``pack`` is a no-op here: ``to_geotiff`` applies the # re-pack transform before dispatching to this writer, so the kwarg # only needs to exist for signature parity (#3064). del pack @@ -542,13 +546,16 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray, if isinstance(data, xr.DataArray): restore_sentinel = _should_restore_nan_sentinel(data.attrs) arr = data.data - # Handle Dask arrays: compute to materialize - if hasattr(arr, 'compute'): + # Dask arrays: only cog=True materialises here (overview + # generation needs the full-resolution array on device). + # Otherwise the array stays lazy and the streaming loop below + # computes one tile-row band at a time (issue #3166). + if hasattr(arr, 'compute') and cog: _warn_dask_materialised() arr = arr.compute() - # Now arr should be CuPy or numpy - if hasattr(arr, 'get'): - pass # CuPy array, already on GPU + # Now arr should be CuPy, numpy, or a still-lazy dask array + if hasattr(arr, 'get') or hasattr(arr, 'compute'): + pass # CuPy (on GPU) or dask (uploaded per band below) else: arr = cupy.asarray(np.asarray(arr)) # numpy -> GPU @@ -562,9 +569,15 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray, # rioxarray and CF-style multi-band rasters land here; without # this remap the writer treats arr.shape[2] as the band axis and # produces a transposed file. The CPU writer does the same remap - # at the matching step in to_geotiff(). + # at the matching step in to_geotiff(). Dask input remaps + # lazily; the per-band ``ascontiguousarray`` in the streaming + # loop replaces the eager one here. if arr.ndim == 3 and data.dims[0] in _BAND_DIM_NAMES: - arr = cupy.ascontiguousarray(cupy.moveaxis(arr, 0, -1)) + if hasattr(arr, 'compute'): + import dask.array as _da + arr = _da.moveaxis(arr, 0, -1) + else: + arr = cupy.ascontiguousarray(cupy.moveaxis(arr, 0, -1)) # Resolve via the centralised resolver. Same precedence as the # CPU writer: prefer attrs['transform'] (bit-stable on @@ -615,10 +628,13 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray, y_res = _rich['y_resolution'] res_unit = _rich['resolution_unit'] else: - if hasattr(data, 'compute'): + if hasattr(data, 'compute') and cog: + # Overview generation needs the full array on device. _warn_dask_materialised() data = data.compute() # Dask -> CuPy or numpy - if hasattr(data, 'device'): + if hasattr(data, 'compute'): + arr = data # still-lazy dask; streamed per band below + elif hasattr(data, 'device'): arr = data # already CuPy elif hasattr(data, 'get'): arr = data # CuPy @@ -632,6 +648,11 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray, samples = arr.shape[2] if arr.ndim == 3 else 1 np_dtype = np.dtype(str(arr.dtype)) # cupy dtype -> numpy dtype + # Dask input that survived the cog gates above streams below: the + # full-resolution part is compressed one tile-row band at a time + # instead of materialising the whole array on device (issue #3166). + stream_dask = hasattr(arr, 'compute') + # Mirror the CPU writer's NaN-to-sentinel substitution. # Without this step the GPU writer emits raw NaN bytes interleaved # with valid data even when ``nodata=`` is supplied; the @@ -651,11 +672,15 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray, # the cost is one GPU array allocation, only on the NaN-present # path, and it guarantees the CPU writer's defensive-copy semantics # in every case. - # Shared NodataLifecycle gate for NaN->sentinel restore. - if _NL(declared=nodata, dtype_in=np_dtype).writer_restore_sentinel( - buffer_dtype=np_dtype, - restore_sentinel=restore_sentinel, - ): + # Shared NodataLifecycle gate for NaN->sentinel restore. The + # streaming path applies the same rewrite per tile-row band inside + # ``_gpu_stream_compress_to_part`` below, after each band is + # computed onto the device. + rewrite_nodata = _NL(declared=nodata, dtype_in=np_dtype).writer_restore_sentinel( + buffer_dtype=np_dtype, + restore_sentinel=restore_sentinel, + ) + if rewrite_nodata and not stream_dask: nan_mask = cupy.isnan(arr) if bool(nan_mask.any()): arr = arr.copy() @@ -680,8 +705,71 @@ def _gpu_compress_to_part(gpu_arr, w, h, spp): stub = np.empty((1, 1, spp) if spp > 1 else (1, 1), dtype=np_dtype) return (stub, w, h, rel_off, bc, compressed) - # Full resolution - parts = [_gpu_compress_to_part(arr, width, height, samples)] + def _gpu_stream_compress_to_part(lazy_arr, w, h, spp): + """Stream-compress a dask array into the same part tuple as + ``_gpu_compress_to_part`` without materialising it in full. + + Computes one tile-row band at a time, grouping consecutive + tile-rows by the source chunk-row span under + ``streaming_buffer_bytes`` via the CPU streaming writer's + ``_stream_row_bands`` helper (so a source chunked taller than + the tile is not recomputed once per tile-row, issue #3117 / + #3007). The floor is one full-width tile-row, so the budget is + a soft cap. + + Per-band compression is byte-identical to one whole-image + ``gpu_compress_tiles`` call: bands are tile-row aligned, tiles + are independent in the TIFF layout, and the tile-extraction + kernel zero-pads edge tiles per band exactly as it does for + the full image. The NaN->sentinel rewrite runs per band after + the compute, mirroring the full-array rewrite above. + """ + from .._writer import _stream_row_bands + bytes_per_src_row = max(1, w * np_dtype.itemsize * spp) + max_src_rows = max(1, streaming_buffer_bytes // bytes_per_src_row) + row_chunks = getattr(lazy_arr, 'chunks', None) + try: + row_bands = _stream_row_bands( + row_chunks[0] if row_chunks else None, + tile_size, h, max_src_rows) + except (TypeError, ValueError): + row_bands = _stream_row_bands(None, tile_size, h, 0) + compressed = [] + for r0, r1 in row_bands: + band = lazy_arr[r0:r1].compute() + if hasattr(band, 'get'): + # CuPy band. ``da.moveaxis`` chunks may compute to + # transposed views; the tile-extraction kernel needs a + # contiguous device buffer (no-op when already + # contiguous). + band = cupy.ascontiguousarray(band) + else: + band = cupy.asarray(np.asarray(band)) # numpy -> GPU + if rewrite_nodata: + nan_mask = cupy.isnan(band) + if bool(nan_mask.any()): + band = band.copy() + band[nan_mask] = np_dtype.type(nodata) + compressed.extend(gpu_compress_tiles( + band, tile_size, tile_size, w, r1 - r0, + comp_tag, pred_val, np_dtype, spp, + compression_level=compression_level)) + rel_off = [] + bc = [] + off = 0 + for tile in compressed: + rel_off.append(off) + bc.append(len(tile)) + off += len(tile) + stub = np.empty((1, 1, spp) if spp > 1 else (1, 1), dtype=np_dtype) + return (stub, w, h, rel_off, bc, compressed) + + # Full resolution. Dask input streams (cog=True materialised it + # above, so ``stream_dask`` is False on the overview path). + if stream_dask: + parts = [_gpu_stream_compress_to_part(arr, width, height, samples)] + else: + parts = [_gpu_compress_to_part(arr, width, height, samples)] # Overview generation mirrors the CPU writer's 8-level cap. if cog: diff --git a/xrspatial/geotiff/tests/gpu/test_writer.py b/xrspatial/geotiff/tests/gpu/test_writer.py index 562934a65..901fae457 100644 --- a/xrspatial/geotiff/tests/gpu/test_writer.py +++ b/xrspatial/geotiff/tests/gpu/test_writer.py @@ -2472,14 +2472,14 @@ def test_to_geotiff_dask_cupy_gpu_false_streaming_multiband_3165( # --------------------------------------------------------------------------- -# Dask materialisation warning on the GPU write path (#3166) +# Dask streaming on the GPU write path (#3166) # -# to_geotiff's streaming contract only holds on the CPU path. dask+cupy -# input auto-dispatches to _write_geotiff_gpu, which computes the whole -# array on device; streaming_buffer_bytes is a no-op there. The writer -# now emits a GeoTIFFFallbackWarning at the compute site so callers -# learn the full array is being materialised. Plain (non-dask) cupy -# writes must stay silent: there is nothing lazy to materialise. +# dask input routed to the GPU writer streams one tile-row band at a +# time, bounded by streaming_buffer_bytes, and must produce a file +# byte-identical to the eager (in-memory cupy) write of the same data. +# Only cog=True still materialises the full array on device (overview +# generation needs it) and emits the GeoTIFFFallbackWarning; every +# other dask GPU write must stay silent. # --------------------------------------------------------------------------- @@ -2509,25 +2509,31 @@ def _gradient_da_3166(backing): @_gpu_only -def test_to_geotiff_dask_cupy_warns_on_materialise_3166(tmp_path): - """Auto-dispatched dask+cupy writes warn that the array is - materialised on device instead of streamed.""" +def test_to_geotiff_dask_cupy_streams_no_warning_3166(tmp_path): + """Auto-dispatched dask+cupy writes stream per tile-row band: no + materialisation warning, and the file is byte-identical to the + eager write of the same data.""" import cupy import dask.array as dca arr = np.arange(64 * 64, dtype=np.float32).reshape(64, 64) da = _gradient_da_3166( dca.from_array(cupy.asarray(arr), chunks=(32, 32))) - path = str(tmp_path / "dask_cupy_materialise_3166.tif") + path = str(tmp_path / "dask_cupy_stream_3166.tif") with warnings.catch_warnings(record=True) as records: warnings.simplefilter("always") to_geotiff(da, path) - assert len(_materialise_warnings_3166(records)) == 1 + assert _materialise_warnings_3166(records) == [] out = open_geotiff(path) np.testing.assert_array_equal(out.values, arr) + eager_path = str(tmp_path / "eager_cupy_3166.tif") + to_geotiff(_gradient_da_3166(cupy.asarray(arr)), eager_path) + assert (pathlib.Path(path).read_bytes() + == pathlib.Path(eager_path).read_bytes()) + @_gpu_only def test_to_geotiff_plain_cupy_no_materialise_warning_3166(tmp_path): @@ -2549,9 +2555,9 @@ def test_to_geotiff_plain_cupy_no_materialise_warning_3166(tmp_path): @_gpu_only -def test_write_geotiff_gpu_positional_dask_warns_3166(tmp_path): - """The positional (non-DataArray) compute site in - ``_write_geotiff_gpu`` emits the same warning.""" +def test_write_geotiff_gpu_positional_dask_streams_3166(tmp_path): + """Positional (non-DataArray) dask input streams through + ``_write_geotiff_gpu`` without the materialisation warning.""" import cupy import dask.array as dca @@ -2563,13 +2569,16 @@ def test_write_geotiff_gpu_positional_dask_warns_3166(tmp_path): warnings.simplefilter("always") _write_geotiff_gpu(lazy, path) - assert len(_materialise_warnings_3166(records)) == 1 + assert _materialise_warnings_3166(records) == [] + out = open_geotiff(path) + np.testing.assert_array_equal(out.values, arr) @_gpu_only -def test_to_geotiff_dask_numpy_gpu_true_warns_3166(tmp_path): +def test_to_geotiff_dask_numpy_gpu_true_streams_3166(tmp_path): """``gpu=True`` with dask+numpy input forces GPU dispatch (no - ``_is_gpu_data`` auto-detect involved) and warns the same way.""" + ``_is_gpu_data`` auto-detect involved); each band is uploaded to + the device per compute, with no materialisation warning.""" import dask.array as dca arr = np.arange(64 * 64, dtype=np.float32).reshape(64, 64) @@ -2580,6 +2589,110 @@ def test_to_geotiff_dask_numpy_gpu_true_warns_3166(tmp_path): warnings.simplefilter("always") to_geotiff(da, path, gpu=True) + assert _materialise_warnings_3166(records) == [] + out = open_geotiff(path) + np.testing.assert_array_equal(out.values, arr) + + +@_gpu_only +def test_to_geotiff_dask_cupy_cog_warns_on_materialise_3166(tmp_path): + """``cog=True`` still materialises dask input in full on device + (overview generation needs the whole array) and warns.""" + import cupy + import dask.array as dca + + arr = np.arange(128 * 128, dtype=np.float32).reshape(128, 128) + da = xr.DataArray( + dca.from_array(cupy.asarray(arr), chunks=(64, 64)), + dims=("y", "x"), + attrs={ + "crs": 4326, + "transform": (1.0, 0.0, -0.5, 0.0, -1.0, 127.5), + }, + ) + path = str(tmp_path / "dask_cupy_cog_3166.tif") + + with warnings.catch_warnings(record=True) as records: + warnings.simplefilter("always") + to_geotiff(da, path, cog=True, tile_size=64) + assert len(_materialise_warnings_3166(records)) == 1 out = open_geotiff(path) np.testing.assert_array_equal(out.values, arr) + + +@_gpu_only +def test_gpu_streaming_small_buffer_byte_identical_3166(tmp_path): + """A tiny ``streaming_buffer_bytes`` forces one band per tile-row + (the floor) over an odd-sized raster with NaN holes and a nodata + sentinel; the streamed file must stay byte-identical to the eager + write and round-trip the same valid-pixel set.""" + import cupy + import dask.array as dca + + rng = np.random.default_rng(3166) + arr = rng.random((70, 52)).astype(np.float32) + arr[5:9, 3:7] = np.nan + da_kwargs = dict( + dims=("y", "x"), + attrs={ + "crs": 4326, + "transform": (1.0, 0.0, -0.5, 0.0, -1.0, 69.5), + }, + ) + lazy = xr.DataArray( + dca.from_array(cupy.asarray(arr), chunks=(24, 52)), **da_kwargs) + stream_path = str(tmp_path / "stream_small_buf_3166.tif") + with warnings.catch_warnings(record=True) as records: + warnings.simplefilter("always") + to_geotiff(lazy, stream_path, nodata=-9999.0, tile_size=32, + streaming_buffer_bytes=1) + assert _materialise_warnings_3166(records) == [] + + eager_path = str(tmp_path / "eager_small_buf_3166.tif") + to_geotiff(xr.DataArray(cupy.asarray(arr), **da_kwargs), eager_path, + nodata=-9999.0, tile_size=32) + + assert (pathlib.Path(stream_path).read_bytes() + == pathlib.Path(eager_path).read_bytes()) + # The per-band NaN->sentinel rewrite must land on disk: the + # default read keeps the sentinel, so NaN holes come back as the + # nodata value. + expected = arr.copy() + expected[np.isnan(arr)] = -9999.0 + out = open_geotiff(stream_path) + np.testing.assert_array_equal(out.values, expected) + + +@_gpu_only +def test_gpu_streaming_band_first_byte_identical_3166(tmp_path): + """Band-first (band, y, x) dask+cupy input remaps lazily and + streams to the same bytes as the eager band-first write.""" + import cupy + import dask.array as dca + + rng = np.random.default_rng(31663) + arr = rng.random((3, 64, 48)).astype(np.float32) + da_kwargs = dict( + dims=("band", "y", "x"), + attrs={ + "crs": 4326, + "transform": (1.0, 0.0, -0.5, 0.0, -1.0, 63.5), + }, + ) + lazy = xr.DataArray( + dca.from_array(cupy.asarray(arr), chunks=(3, 32, 48)), **da_kwargs) + stream_path = str(tmp_path / "stream_band_first_3166.tif") + with warnings.catch_warnings(record=True) as records: + warnings.simplefilter("always") + to_geotiff(lazy, stream_path, tile_size=32) + assert _materialise_warnings_3166(records) == [] + + eager_path = str(tmp_path / "eager_band_first_3166.tif") + to_geotiff(xr.DataArray(cupy.asarray(arr), **da_kwargs), eager_path, + tile_size=32) + + assert (pathlib.Path(stream_path).read_bytes() + == pathlib.Path(eager_path).read_bytes()) + out = open_geotiff(stream_path) + np.testing.assert_array_equal(out.values, np.moveaxis(arr, 0, -1)) From b7420fdd43d2609e8d8499a1d51d799a234c85c0 Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Thu, 11 Jun 2026 13:05:39 -0700 Subject: [PATCH 2/2] Address review: scope streaming docs to device memory, add band-last and BytesIO tests (#3166) --- xrspatial/geotiff/_gpu_decode.py | 7 +- xrspatial/geotiff/_writers/eager.py | 11 ++- xrspatial/geotiff/_writers/gpu.py | 5 +- xrspatial/geotiff/tests/gpu/test_writer.py | 105 +++++++++++++++++---- 4 files changed, 102 insertions(+), 26 deletions(-) diff --git a/xrspatial/geotiff/_gpu_decode.py b/xrspatial/geotiff/_gpu_decode.py index e6aabeea8..c52714d0c 100644 --- a/xrspatial/geotiff/_gpu_decode.py +++ b/xrspatial/geotiff/_gpu_decode.py @@ -3129,9 +3129,10 @@ def gpu_compress_tiles(d_image, tile_width, tile_height, # path. Say so instead of silently compressing at the # library default (#3167). The GPU writer calls this # function once per IFD part (full resolution plus each - # COG overview level); the default warning filter dedups - # by location, so normal runs see the message once, but - # ``-W always`` repeats it per part. + # COG overview level), or once per tile-row band when + # streaming dask input (#3166); the default warning filter + # dedups by location, so normal runs see the message once, + # but ``-W always`` repeats it per call. warnings.warn( f"compression_level={compression_level} is ignored by " "the nvCOMP GPU encoder; tiles are compressed at the " diff --git a/xrspatial/geotiff/_writers/eager.py b/xrspatial/geotiff/_writers/eager.py index 41860596b..e3c443426 100644 --- a/xrspatial/geotiff/_writers/eager.py +++ b/xrspatial/geotiff/_writers/eager.py @@ -112,10 +112,13 @@ def to_geotiff(data: xr.DataArray | np.ndarray, ``gpu=True`` with any dask backing) also streams: each tile-row band is computed onto the device, compressed, and released before the next, with the per-compute budget capped by - ``streaming_buffer_bytes`` (issue #3166). The exception is - ``cog=True``, which materialises the full array on device because - overview generation needs it, and emits a - ``GeoTIFFFallbackWarning`` when it does. + ``streaming_buffer_bytes`` (issue #3166). The bound is on device + memory only: the GPU writer still assembles the compressed file + in host RAM before writing it out, unlike the CPU streaming path, + which writes incrementally to disk. The exception is ``cog=True``, + which materialises the full array on device because overview + generation needs it, and emits a ``GeoTIFFFallbackWarning`` when + it does. Automatically dispatches to GPU compression when: - ``gpu=True`` is passed, or diff --git a/xrspatial/geotiff/_writers/gpu.py b/xrspatial/geotiff/_writers/gpu.py index d2260b26f..9b68eae9d 100644 --- a/xrspatial/geotiff/_writers/gpu.py +++ b/xrspatial/geotiff/_writers/gpu.py @@ -252,7 +252,10 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray, chunk-row span under this budget the same way the CPU streaming writer does; the floor is one full-width tile-row, so the cap is soft, and the tile-extraction buffer adds about - one band-sized allocation on top. In-memory (cupy / numpy) + one band-sized allocation on top. The cap bounds device + memory only: the compressed tile bytes accumulate in host RAM + until the file is assembled and written, matching the + non-streaming GPU write. In-memory (cupy / numpy) input ignores the kwarg. ``cog=True`` still materialises dask input in full on device (overview generation needs the whole array) and emits a ``GeoTIFFFallbackWarning``. diff --git a/xrspatial/geotiff/tests/gpu/test_writer.py b/xrspatial/geotiff/tests/gpu/test_writer.py index 901fae457..8341b70d3 100644 --- a/xrspatial/geotiff/tests/gpu/test_writer.py +++ b/xrspatial/geotiff/tests/gpu/test_writer.py @@ -2633,15 +2633,20 @@ def test_gpu_streaming_small_buffer_byte_identical_3166(tmp_path): rng = np.random.default_rng(3166) arr = rng.random((70, 52)).astype(np.float32) arr[5:9, 3:7] = np.nan - da_kwargs = dict( - dims=("y", "x"), - attrs={ - "crs": 4326, - "transform": (1.0, 0.0, -0.5, 0.0, -1.0, 69.5), - }, - ) + + # Fresh dims/attrs per DataArray so the streamed and eager writes + # cannot observe each other through a shared attrs dict. + def da_kwargs(): + return dict( + dims=("y", "x"), + attrs={ + "crs": 4326, + "transform": (1.0, 0.0, -0.5, 0.0, -1.0, 69.5), + }, + ) + lazy = xr.DataArray( - dca.from_array(cupy.asarray(arr), chunks=(24, 52)), **da_kwargs) + dca.from_array(cupy.asarray(arr), chunks=(24, 52)), **da_kwargs()) stream_path = str(tmp_path / "stream_small_buf_3166.tif") with warnings.catch_warnings(record=True) as records: warnings.simplefilter("always") @@ -2650,7 +2655,7 @@ def test_gpu_streaming_small_buffer_byte_identical_3166(tmp_path): assert _materialise_warnings_3166(records) == [] eager_path = str(tmp_path / "eager_small_buf_3166.tif") - to_geotiff(xr.DataArray(cupy.asarray(arr), **da_kwargs), eager_path, + to_geotiff(xr.DataArray(cupy.asarray(arr), **da_kwargs()), eager_path, nodata=-9999.0, tile_size=32) assert (pathlib.Path(stream_path).read_bytes() @@ -2673,15 +2678,18 @@ def test_gpu_streaming_band_first_byte_identical_3166(tmp_path): rng = np.random.default_rng(31663) arr = rng.random((3, 64, 48)).astype(np.float32) - da_kwargs = dict( - dims=("band", "y", "x"), - attrs={ - "crs": 4326, - "transform": (1.0, 0.0, -0.5, 0.0, -1.0, 63.5), - }, - ) + + def da_kwargs(): + return dict( + dims=("band", "y", "x"), + attrs={ + "crs": 4326, + "transform": (1.0, 0.0, -0.5, 0.0, -1.0, 63.5), + }, + ) + lazy = xr.DataArray( - dca.from_array(cupy.asarray(arr), chunks=(3, 32, 48)), **da_kwargs) + dca.from_array(cupy.asarray(arr), chunks=(3, 32, 48)), **da_kwargs()) stream_path = str(tmp_path / "stream_band_first_3166.tif") with warnings.catch_warnings(record=True) as records: warnings.simplefilter("always") @@ -2689,10 +2697,71 @@ def test_gpu_streaming_band_first_byte_identical_3166(tmp_path): assert _materialise_warnings_3166(records) == [] eager_path = str(tmp_path / "eager_band_first_3166.tif") - to_geotiff(xr.DataArray(cupy.asarray(arr), **da_kwargs), eager_path, + to_geotiff(xr.DataArray(cupy.asarray(arr), **da_kwargs()), eager_path, tile_size=32) assert (pathlib.Path(stream_path).read_bytes() == pathlib.Path(eager_path).read_bytes()) out = open_geotiff(stream_path) np.testing.assert_array_equal(out.values, np.moveaxis(arr, 0, -1)) + + +@_gpu_only +def test_gpu_streaming_band_last_byte_identical_3166(tmp_path): + """Band-last (y, x, band) dask input needs no remap; the stream + loop slices an already band-last 3D dask array along y and must + match the eager write byte for byte.""" + import cupy + import dask.array as dca + + rng = np.random.default_rng(31664) + arr = rng.random((64, 48, 3)).astype(np.float32) + + def da_kwargs(): + return dict( + dims=("y", "x", "band"), + attrs={ + "crs": 4326, + "transform": (1.0, 0.0, -0.5, 0.0, -1.0, 63.5), + }, + ) + + lazy = xr.DataArray( + dca.from_array(cupy.asarray(arr), chunks=(32, 48, 3)), **da_kwargs()) + stream_path = str(tmp_path / "stream_band_last_3166.tif") + with warnings.catch_warnings(record=True) as records: + warnings.simplefilter("always") + to_geotiff(lazy, stream_path, tile_size=32) + assert _materialise_warnings_3166(records) == [] + + eager_path = str(tmp_path / "eager_band_last_3166.tif") + to_geotiff(xr.DataArray(cupy.asarray(arr), **da_kwargs()), eager_path, + tile_size=32) + + assert (pathlib.Path(stream_path).read_bytes() + == pathlib.Path(eager_path).read_bytes()) + out = open_geotiff(stream_path) + np.testing.assert_array_equal(out.values, arr) + + +@_gpu_only +def test_write_geotiff_gpu_dask_to_bytesio_streams_3166(tmp_path): + """File-like destinations stay supported on the non-COG GPU path: + dask input streams into a BytesIO and produces the same bytes as + the write to a filesystem path.""" + import io + + import cupy + import dask.array as dca + + arr = np.arange(48 * 40, dtype=np.float32).reshape(48, 40) + lazy = dca.from_array(cupy.asarray(arr), chunks=(16, 40)) + buf = io.BytesIO() + with warnings.catch_warnings(record=True) as records: + warnings.simplefilter("always") + _write_geotiff_gpu(lazy, buf, tile_size=32) + assert _materialise_warnings_3166(records) == [] + + path = str(tmp_path / "dask_bytesio_3166.tif") + _write_geotiff_gpu(cupy.asarray(arr), path, tile_size=32) + assert buf.getvalue() == pathlib.Path(path).read_bytes()