Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions xrspatial/geotiff/_gpu_decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -3129,9 +3129,10 @@ def gpu_compress_tiles(d_image, tile_width, tile_height,
# path. Say so instead of silently compressing at the
# library default (#3167). The GPU writer calls this
# function once per IFD part (full resolution plus each
# COG overview level); the default warning filter dedups
# by location, so normal runs see the message once, but
# ``-W always`` repeats it per part.
# COG overview level), or once per tile-row band when
# streaming dask input (#3166); the default warning filter
# dedups by location, so normal runs see the message once,
# but ``-W always`` repeats it per call.
warnings.warn(
f"compression_level={compression_level} is ignored by "
"the nvCOMP GPU encoder; tiles are compressed at the "
Expand Down
36 changes: 20 additions & 16 deletions xrspatial/geotiff/_writers/eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,17 @@ def to_geotiff(data: xr.DataArray | np.ndarray,
chunk-rows at once (#3007). COG output (``cog=True``) still
materialises because overviews need the full array.

Streaming does NOT apply to the GPU writer. CuPy-backed dask input
(dask+cupy) auto-dispatches to the GPU writer, which materialises
the entire array on device before encoding;
``streaming_buffer_bytes`` is ignored there and a
``GeoTIFFFallbackWarning`` is emitted when the materialisation
happens (issue #3166). The same applies when ``gpu=True`` is passed
with any dask-backed input.
Dask input routed to the GPU writer (auto-detected dask+cupy, or
``gpu=True`` with any dask backing) also streams: each tile-row
band is computed onto the device, compressed, and released before
the next, with the per-compute budget capped by
``streaming_buffer_bytes`` (issue #3166). The bound is on device
memory only: the GPU writer still assembles the compressed file
in host RAM before writing it out, unlike the CPU streaming path,
which writes incrementally to disk. The exception is ``cog=True``,
which materialises the full array on device because overview
generation needs it, and emits a ``GeoTIFFFallbackWarning`` when
it does.

Automatically dispatches to GPU compression when:
- ``gpu=True`` is passed, or
Expand Down Expand Up @@ -252,20 +256,20 @@ def to_geotiff(data: xr.DataArray | np.ndarray,
acceleration; backend parity with the CPU writer is tested for
the Tier 1 codec set only. Force GPU compression. None
(default) auto-detects CuPy data, including CuPy-backed dask
arrays. Dask-backed input routed to the GPU writer is
materialised in full on device (no streaming; see
arrays. Dask-backed input routed to the GPU writer streams one
tile-row band at a time unless ``cog=True`` (see
``streaming_buffer_bytes``).
streaming_buffer_bytes : int
[stable] Soft cap on bytes materialised per dask compute call
when streaming a dask-backed DataArray. Defaults to 256 MB.
Wide rasters whose tile-row exceeds this budget are split into
horizontal segments. Only applies to dask-backed inputs on the
CPU write path; the kwarg is a no-op for numpy / CuPy / COG
paths (the COG path materialises the full array because the
overview pyramid needs it) and for the GPU writer. dask+cupy
input auto-dispatches to the GPU writer, so it materialises in
full on device instead of streaming and emits a
``GeoTIFFFallbackWarning`` (issue #3166).
horizontal segments on the CPU path. On the GPU path the cap
bounds the device bytes computed per tile-row band, with a
floor of one full-width tile-row (issue #3166). The kwarg is a
no-op for in-memory (numpy / CuPy) input and for COG output,
which materialises the full array because the overview pyramid
needs it; a dask-backed ``cog=True`` GPU write emits a
``GeoTIFFFallbackWarning`` when it materialises.
max_z_error : float
[experimental] Per-pixel error budget for LERC compression.
``0.0`` (default) is lossless; larger values let the encoder
Expand Down
171 changes: 131 additions & 40 deletions xrspatial/geotiff/_writers/gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,18 @@
def _warn_dask_materialised() -> None:
"""Warn that a dask-backed input is about to be fully materialised.

The GPU writer has no streaming mode: ``to_geotiff``'s dask
streaming contract only applies to the CPU path, and dask+cupy
input auto-dispatches here (issue #3166). Emitted from both
``.compute()`` sites in ``_write_geotiff_gpu`` so explicit and
auto-detected GPU writes warn the same way.
Non-COG dask input streams through the GPU writer one tile-row
band at a time (issue #3166), so this fires only from the
``cog=True`` ``.compute()`` sites in ``_write_geotiff_gpu``:
overview generation needs the full-resolution array on device.
Emitted from both the DataArray and positional compute sites so
explicit and auto-detected GPU writes warn the same way.

Deliberately does NOT participate in the
``XRSPATIAL_GEOTIFF_STRICT`` promotion that most
``GeoTIFFFallbackWarning`` sites apply: there is no opt-out flag
for the materialisation, so promotion would turn every dask+cupy
GPU write into a hard failure. Same shape as the JPEG /
COG write into a hard failure. Same shape as the JPEG /
experimental-codec opt-in warnings, which also stay warnings under
strict mode.

Expand All @@ -54,10 +55,10 @@ def _warn_dask_materialised() -> None:
has already run by the time the fallback fires.
"""
warnings.warn(
"Dask-backed input routed to the GPU writer is materialised "
"in full on device before encoding; the GPU writer has no "
"streaming mode and streaming_buffer_bytes is ignored. "
"See issue #3166.",
"Dask-backed input with cog=True is materialised in full on "
"device before encoding: GPU overview generation needs the "
"whole array, so streaming_buffer_bytes is ignored. Pass "
"cog=False to stream per tile-row band. See issue #3166.",
GeoTIFFFallbackWarning,
stacklevel=3,
)
Expand Down Expand Up @@ -143,9 +144,11 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray,
----------
data : xr.DataArray (CuPy- or NumPy-backed), cupy.ndarray, or np.ndarray
[experimental] 2D or 3D raster. CuPy-backed inputs stay on
device; NumPy/Dask inputs are uploaded via
device; NumPy inputs are uploaded via
``cupy.asarray(np.asarray(data))`` before compression (matches
``to_geotiff`` parity).
``to_geotiff`` parity). Dask-backed inputs stream one tile-row
band at a time unless ``cog=True`` (see
``streaming_buffer_bytes``).
path : str or binary file-like
[experimental] Output file path or any object with a ``write``
method (e.g. ``io.BytesIO``). ``cog=True`` requires a string
Expand Down Expand Up @@ -241,13 +244,21 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray,
auto-promotes when the estimated file size would exceed the
classic-TIFF 4 GB limit.
streaming_buffer_bytes : int
[internal-only] Accepted for API parity with ``to_geotiff``.
The GPU writer materialises the entire array on device and has
no streaming concept, so this kwarg is a no-op. Default matches
``to_geotiff`` (256 MB) so callers passing the same kwargs to
either entry point see the same default and the same type.
Dask-backed inputs emit a ``GeoTIFFFallbackWarning`` when they
are materialised (issue #3166).
[experimental] Soft cap on the bytes materialised on device
per dask compute call when the input is dask-backed and
``cog=False`` (issue #3166). Default matches ``to_geotiff``
(256 MB). The writer computes and compresses one tile-row band
at a time, grouping consecutive tile-rows by the source
chunk-row span under this budget the same way the CPU
streaming writer does; the floor is one full-width tile-row,
so the cap is soft, and the tile-extraction buffer adds about
one band-sized allocation on top. The cap bounds device
memory only: the compressed tile bytes accumulate in host RAM
until the file is assembled and written, matching the
non-streaming GPU write. In-memory (cupy / numpy)
input ignores the kwarg. ``cog=True`` still materialises
dask input in full on device (overview generation needs the
whole array) and emits a ``GeoTIFFFallbackWarning``.
max_z_error : float
[internal-only] Per-pixel error budget for LERC compression.
The GPU writer does not implement LERC (nvCOMP has no LERC
Expand Down Expand Up @@ -496,11 +507,7 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray,
raise TypeError(
f"path must be a str or a binary file-like with a write() "
f"method, got {type(path).__name__}")
# streaming_buffer_bytes is intentionally a no-op on the GPU path;
# the kwarg exists for API parity with to_geotiff so callers can pass
# the same kwargs to both entry points without filtering.
del streaming_buffer_bytes
# ``pack`` is likewise a no-op here: ``to_geotiff`` applies the
# ``pack`` is a no-op here: ``to_geotiff`` applies the
# re-pack transform before dispatching to this writer, so the kwarg
# only needs to exist for signature parity (#3064).
del pack
Expand Down Expand Up @@ -542,13 +549,16 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray,
if isinstance(data, xr.DataArray):
restore_sentinel = _should_restore_nan_sentinel(data.attrs)
arr = data.data
# Handle Dask arrays: compute to materialize
if hasattr(arr, 'compute'):
# Dask arrays: only cog=True materialises here (overview
# generation needs the full-resolution array on device).
# Otherwise the array stays lazy and the streaming loop below
# computes one tile-row band at a time (issue #3166).
if hasattr(arr, 'compute') and cog:
_warn_dask_materialised()
arr = arr.compute()
# Now arr should be CuPy or numpy
if hasattr(arr, 'get'):
pass # CuPy array, already on GPU
# Now arr should be CuPy, numpy, or a still-lazy dask array
if hasattr(arr, 'get') or hasattr(arr, 'compute'):
pass # CuPy (on GPU) or dask (uploaded per band below)
else:
arr = cupy.asarray(np.asarray(arr)) # numpy -> GPU

Expand All @@ -562,9 +572,15 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray,
# rioxarray and CF-style multi-band rasters land here; without
# this remap the writer treats arr.shape[2] as the band axis and
# produces a transposed file. The CPU writer does the same remap
# at the matching step in to_geotiff().
# at the matching step in to_geotiff(). Dask input remaps
# lazily; the per-band ``ascontiguousarray`` in the streaming
# loop replaces the eager one here.
if arr.ndim == 3 and data.dims[0] in _BAND_DIM_NAMES:
arr = cupy.ascontiguousarray(cupy.moveaxis(arr, 0, -1))
if hasattr(arr, 'compute'):
import dask.array as _da
arr = _da.moveaxis(arr, 0, -1)
else:
arr = cupy.ascontiguousarray(cupy.moveaxis(arr, 0, -1))

# Resolve via the centralised resolver. Same precedence as the
# CPU writer: prefer attrs['transform'] (bit-stable on
Expand Down Expand Up @@ -615,10 +631,13 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray,
y_res = _rich['y_resolution']
res_unit = _rich['resolution_unit']
else:
if hasattr(data, 'compute'):
if hasattr(data, 'compute') and cog:
# Overview generation needs the full array on device.
_warn_dask_materialised()
data = data.compute() # Dask -> CuPy or numpy
if hasattr(data, 'device'):
if hasattr(data, 'compute'):
arr = data # still-lazy dask; streamed per band below
elif hasattr(data, 'device'):
arr = data # already CuPy
elif hasattr(data, 'get'):
arr = data # CuPy
Expand All @@ -632,6 +651,11 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray,
samples = arr.shape[2] if arr.ndim == 3 else 1
np_dtype = np.dtype(str(arr.dtype)) # cupy dtype -> numpy dtype

# Dask input that survived the cog gates above streams below: the
# full-resolution part is compressed one tile-row band at a time
# instead of materialising the whole array on device (issue #3166).
stream_dask = hasattr(arr, 'compute')

# Mirror the CPU writer's NaN-to-sentinel substitution.
# Without this step the GPU writer emits raw NaN bytes interleaved
# with valid data even when ``nodata=<finite>`` is supplied; the
Expand All @@ -651,11 +675,15 @@ def _write_geotiff_gpu(data: xr.DataArray | cupy.ndarray | np.ndarray,
# the cost is one GPU array allocation, only on the NaN-present
# path, and it guarantees the CPU writer's defensive-copy semantics
# in every case.
# Shared NodataLifecycle gate for NaN->sentinel restore.
if _NL(declared=nodata, dtype_in=np_dtype).writer_restore_sentinel(
buffer_dtype=np_dtype,
restore_sentinel=restore_sentinel,
):
# Shared NodataLifecycle gate for NaN->sentinel restore. The
# streaming path applies the same rewrite per tile-row band inside
# ``_gpu_stream_compress_to_part`` below, after each band is
# computed onto the device.
rewrite_nodata = _NL(declared=nodata, dtype_in=np_dtype).writer_restore_sentinel(
buffer_dtype=np_dtype,
restore_sentinel=restore_sentinel,
)
if rewrite_nodata and not stream_dask:
nan_mask = cupy.isnan(arr)
if bool(nan_mask.any()):
arr = arr.copy()
Expand All @@ -680,8 +708,71 @@ def _gpu_compress_to_part(gpu_arr, w, h, spp):
stub = np.empty((1, 1, spp) if spp > 1 else (1, 1), dtype=np_dtype)
return (stub, w, h, rel_off, bc, compressed)

# Full resolution
parts = [_gpu_compress_to_part(arr, width, height, samples)]
def _gpu_stream_compress_to_part(lazy_arr, w, h, spp):
"""Stream-compress a dask array into the same part tuple as
``_gpu_compress_to_part`` without materialising it in full.

Computes one tile-row band at a time, grouping consecutive
tile-rows by the source chunk-row span under
``streaming_buffer_bytes`` via the CPU streaming writer's
``_stream_row_bands`` helper (so a source chunked taller than
the tile is not recomputed once per tile-row, issue #3117 /
#3007). The floor is one full-width tile-row, so the budget is
a soft cap.

Per-band compression is byte-identical to one whole-image
``gpu_compress_tiles`` call: bands are tile-row aligned, tiles
are independent in the TIFF layout, and the tile-extraction
kernel zero-pads edge tiles per band exactly as it does for
the full image. The NaN->sentinel rewrite runs per band after
the compute, mirroring the full-array rewrite above.
"""
from .._writer import _stream_row_bands
bytes_per_src_row = max(1, w * np_dtype.itemsize * spp)
max_src_rows = max(1, streaming_buffer_bytes // bytes_per_src_row)
row_chunks = getattr(lazy_arr, 'chunks', None)
try:
row_bands = _stream_row_bands(
row_chunks[0] if row_chunks else None,
tile_size, h, max_src_rows)
except (TypeError, ValueError):
row_bands = _stream_row_bands(None, tile_size, h, 0)
compressed = []
for r0, r1 in row_bands:
band = lazy_arr[r0:r1].compute()
if hasattr(band, 'get'):
# CuPy band. ``da.moveaxis`` chunks may compute to
# transposed views; the tile-extraction kernel needs a
# contiguous device buffer (no-op when already
# contiguous).
band = cupy.ascontiguousarray(band)
else:
band = cupy.asarray(np.asarray(band)) # numpy -> GPU
if rewrite_nodata:
nan_mask = cupy.isnan(band)
if bool(nan_mask.any()):
band = band.copy()
band[nan_mask] = np_dtype.type(nodata)
compressed.extend(gpu_compress_tiles(
band, tile_size, tile_size, w, r1 - r0,
comp_tag, pred_val, np_dtype, spp,
compression_level=compression_level))
rel_off = []
bc = []
off = 0
for tile in compressed:
rel_off.append(off)
bc.append(len(tile))
off += len(tile)
stub = np.empty((1, 1, spp) if spp > 1 else (1, 1), dtype=np_dtype)
return (stub, w, h, rel_off, bc, compressed)

# Full resolution. Dask input streams (cog=True materialised it
# above, so ``stream_dask`` is False on the overview path).
if stream_dask:
parts = [_gpu_stream_compress_to_part(arr, width, height, samples)]
else:
parts = [_gpu_compress_to_part(arr, width, height, samples)]

# Overview generation mirrors the CPU writer's 8-level cap.
if cog:
Expand Down
Loading
Loading