Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions xrspatial/geotiff/tests/_geotiff_fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Shared GeoTIFF fixture builder for the test suite.

Consolidates the hand-built TIFF helpers that were duplicated across
test modules, exposing a single parameterised ``write_minimal_tiff``
that supports GeoKey directory and/or GeoAsciiParams payloads.

Migrated from:
- ``unit/test_geotags.py::_write_tiff_with_geokeys`` (#2417)
- ``unit/test_metadata.py::_write_minimal_tiff_with_wkt`` (#1987)
"""
from __future__ import annotations

import struct
from pathlib import Path

import numpy as np


def write_minimal_tiff(
path: str,
*,
geokeys: list[int] | None = None,
geo_ascii: str | None = None,
) -> None:
"""Build a 4x4 float32 TIFF with optional GeoKey directory and/or
GeoAsciiParams tags, then write it to ``path``.

Parameters
----------
path : str
Destination file path.
geokeys : list of int, optional
SHORT values for the GeoKeyDirectory tag (34735). If given, the
tag is emitted directly; no automatic header is prepended.
geo_ascii : str, optional
ASCII string for the GeoAsciiParams tag (34737). A trailing ``|``
separator and NULL terminator are added automatically.
"""
bo = '<'
pixels = np.zeros((4, 4), dtype=np.float32).tobytes()

tag_list: list[tuple[int, int, int, bytes]] = []

def add_short(tag, val):
tag_list.append((tag, 3, 1, struct.pack(f'{bo}H', val)))

def add_long(tag, val):
tag_list.append((tag, 4, 1, struct.pack(f'{bo}I', val)))

def add_shorts(tag, vals):
tag_list.append((tag, 3, len(vals),
struct.pack(f'{bo}{len(vals)}H', *vals)))

def add_doubles(tag, vals):
tag_list.append((tag, 12, len(vals),
struct.pack(f'{bo}{len(vals)}d', *vals)))

def add_ascii(tag, raw_bytes):
if not raw_bytes.endswith(b'\x00'):
raw_bytes = raw_bytes + b'\x00'
tag_list.append((tag, 2, len(raw_bytes), raw_bytes))

add_short(256, 4)
add_short(257, 4)
add_short(258, 32)
add_short(259, 1)
add_short(262, 1)
add_short(277, 1)
add_short(339, 3)
add_short(278, 4)
add_long(273, 0)
add_long(279, len(pixels))
add_doubles(33550, [1.0, 1.0, 0.0])
add_doubles(33922, [0.0, 0.0, 0.0, 0.0, 0.0, 0.0])

if geokeys is not None:
add_shorts(34735, geokeys)
if geo_ascii is not None:
ascii_buf = bytearray((geo_ascii + '|').encode('ascii'))
add_ascii(34737, bytes(ascii_buf))

tag_list.sort(key=lambda t: t[0])

n = len(tag_list)
ifd_start = 8
ifd_size = 2 + 12 * n + 4
overflow_start = ifd_start + ifd_size
overflow_buf = bytearray()
tag_offsets: dict[int, int | None] = {}
for tag, _typ, _count, raw in tag_list:
if len(raw) > 4:
tag_offsets[tag] = len(overflow_buf)
overflow_buf.extend(raw)
if len(overflow_buf) % 2:
overflow_buf.append(0)
else:
tag_offsets[tag] = None
pixel_data_start = overflow_start + len(overflow_buf)
patched = []
for tag, typ, count, raw in tag_list:
if tag == 273:
raw = struct.pack(f'{bo}I', pixel_data_start)
patched.append((tag, typ, count, raw))
tag_list = patched
out = bytearray(b'II')
out.extend(struct.pack(f'{bo}H', 42))
out.extend(struct.pack(f'{bo}I', ifd_start))
out.extend(struct.pack(f'{bo}H', n))
for tag, typ, count, raw in tag_list:
out.extend(struct.pack(f'{bo}HHI', tag, typ, count))
if len(raw) <= 4:
out.extend(raw.ljust(4, b'\x00'))
else:
out.extend(struct.pack(f'{bo}I', overflow_start + tag_offsets[tag]))
out.extend(struct.pack(f'{bo}I', 0))
out.extend(overflow_buf)
out.extend(pixels)
Path(path).write_bytes(bytes(out))
84 changes: 2 additions & 82 deletions xrspatial/geotiff/tests/unit/test_geotags.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from __future__ import annotations

import struct
from pathlib import Path

import numpy as np
import pytest
Expand All @@ -18,6 +17,7 @@
from xrspatial.geotiff._validation import (_check_read_inconsistent_geokeys,
_registered_read_metadata_checks, validate_read_metadata)

from .._geotiff_fixtures import write_minimal_tiff
from ..conftest import make_minimal_tiff


Expand Down Expand Up @@ -747,94 +747,14 @@ def _write_tiff_with_geokeys(
projected_cs_type: int | None,
geographic_type: int | None,
) -> None:
"""Hand-build a 4x4 float32 TIFF with a tiny GeoKey directory.

Only ``ModelTypeGeoKey`` plus optional ``ProjectedCSTypeGeoKey`` and
``GeographicTypeGeoKey`` are written; the rest of the directory is
minimal. Pass ``None`` to either type-key argument to omit it.

Unique fixture for issue #2417 -- name carries the issue number so
parallel test runs and other worktrees do not collide on tmp paths.
"""
bo = '<'
pixels = np.zeros((4, 4), dtype=np.float32).tobytes()

# GeoKeyDirectory header: (version, rev_maj, rev_min, n_keys)
n_keys = 1 + int(projected_cs_type is not None) + int(geographic_type is not None)
gkd = [1, 1, 0, n_keys]
# ModelTypeGeoKey: id 1024, location 0 (immediate value), count 1, value.
gkd.extend([1024, 0, 1, model_type])
if projected_cs_type is not None:
gkd.extend([3072, 0, 1, projected_cs_type])
if geographic_type is not None:
gkd.extend([2048, 0, 1, geographic_type])

tag_list = []

def add_short(tag, val):
tag_list.append((tag, 3, 1, struct.pack(f'{bo}H', val)))

def add_long(tag, val):
tag_list.append((tag, 4, 1, struct.pack(f'{bo}I', val)))

def add_shorts(tag, vals):
tag_list.append((tag, 3, len(vals),
struct.pack(f'{bo}{len(vals)}H', *vals)))

def add_doubles(tag, vals):
tag_list.append((tag, 12, len(vals),
struct.pack(f'{bo}{len(vals)}d', *vals)))

add_short(256, 4)
add_short(257, 4)
add_short(258, 32)
add_short(259, 1)
add_short(262, 1)
add_short(277, 1)
add_short(339, 3)
add_short(278, 4)
add_long(273, 0)
add_long(279, len(pixels))
add_doubles(33550, [1.0, 1.0, 0.0])
add_doubles(33922, [0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
add_shorts(34735, gkd)
tag_list.sort(key=lambda t: t[0])

n = len(tag_list)
ifd_start = 8
ifd_size = 2 + 12 * n + 4
overflow_start = ifd_start + ifd_size
overflow_buf = bytearray()
tag_offsets: dict[int, int | None] = {}
for tag, _typ, _count, raw in tag_list:
if len(raw) > 4:
tag_offsets[tag] = len(overflow_buf)
overflow_buf.extend(raw)
if len(overflow_buf) % 2:
overflow_buf.append(0)
else:
tag_offsets[tag] = None
pixel_data_start = overflow_start + len(overflow_buf)
patched = []
for tag, typ, count, raw in tag_list:
if tag == 273:
raw = struct.pack(f'{bo}I', pixel_data_start)
patched.append((tag, typ, count, raw))
tag_list = patched
out = bytearray(b'II')
out.extend(struct.pack(f'{bo}H', 42))
out.extend(struct.pack(f'{bo}I', ifd_start))
out.extend(struct.pack(f'{bo}H', n))
for tag, typ, count, raw in tag_list:
out.extend(struct.pack(f'{bo}HHI', tag, typ, count))
if len(raw) <= 4:
out.extend(raw.ljust(4, b'\x00'))
else:
out.extend(struct.pack(f'{bo}I', overflow_start + tag_offsets[tag]))
out.extend(struct.pack(f'{bo}I', 0))
out.extend(overflow_buf)
out.extend(pixels)
Path(path).write_bytes(bytes(out))
write_minimal_tiff(path, geokeys=gkd)


def test_open_geotiff_rejects_model_geographic_with_projected_key(tmp_path):
Expand Down
89 changes: 4 additions & 85 deletions xrspatial/geotiff/tests/unit/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import pytest
import xarray as xr

from .._geotiff_fixtures import write_minimal_tiff

import xrspatial.geotiff as geotiff_pkg
from xrspatial.geotiff import (ConflictingCRSError, GeoTIFFAmbiguousMetadataError,
MixedBandMetadataError, _runtime)
Expand Down Expand Up @@ -1441,92 +1443,9 @@ def _da(*, coords=None, attrs=None, shape=(4, 4)):


def _write_minimal_tiff_with_wkt(path: str, wkt: str) -> None:
"""Hand-build a 4x4 float32 TIFF that stashes ``wkt`` in
``GeoAsciiParams`` (tag 34737), referenced from
``GeoKeyDirectory`` (tag 34735) as ``GTCitationGeoKey`` (id 1026)."""
bo = '<'
pixels = np.zeros((4, 4), dtype=np.float32).tobytes()
ascii_buf = bytearray((wkt + '|').encode('ascii'))
# GeoKeyDirectory: header + one entry pointing into GeoAsciiParams.
gkd = [
1, 1, 0, 1,
1026, # GTCitationGeoKey
34737, # location = GeoAsciiParams tag
len(wkt) + 1, # count (incl. '|')
0, # offset into GeoAsciiParams
]
tag_list = []

def add_short(tag, val):
tag_list.append((tag, 3, 1, struct.pack(f'{bo}H', val)))

def add_long(tag, val):
tag_list.append((tag, 4, 1, struct.pack(f'{bo}I', val)))

def add_shorts(tag, vals):
tag_list.append((tag, 3, len(vals),
struct.pack(f'{bo}{len(vals)}H', *vals)))

def add_doubles(tag, vals):
tag_list.append((tag, 12, len(vals),
struct.pack(f'{bo}{len(vals)}d', *vals)))

def add_ascii(tag, raw_bytes):
if not raw_bytes.endswith(b'\x00'):
raw_bytes = raw_bytes + b'\x00'
tag_list.append((tag, 2, len(raw_bytes), raw_bytes))

add_short(256, 4)
add_short(257, 4)
add_short(258, 32)
add_short(259, 1)
add_short(262, 1)
add_short(277, 1)
add_short(339, 3)
add_short(278, 4)
add_long(273, 0)
add_long(279, len(pixels))
add_doubles(33550, [1.0, 1.0, 0.0])
add_doubles(33922, [0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
add_shorts(34735, gkd)
add_ascii(34737, bytes(ascii_buf))
tag_list.sort(key=lambda t: t[0])

n = len(tag_list)
ifd_start = 8
ifd_size = 2 + 12 * n + 4
overflow_start = ifd_start + ifd_size
overflow_buf = bytearray()
tag_offsets: dict[int, int | None] = {}
for tag, _typ, _count, raw in tag_list:
if len(raw) > 4:
tag_offsets[tag] = len(overflow_buf)
overflow_buf.extend(raw)
if len(overflow_buf) % 2:
overflow_buf.append(0)
else:
tag_offsets[tag] = None
pixel_data_start = overflow_start + len(overflow_buf)
patched = []
for tag, typ, count, raw in tag_list:
if tag == 273:
raw = struct.pack(f'{bo}I', pixel_data_start)
patched.append((tag, typ, count, raw))
tag_list = patched
out = bytearray(b'II')
out.extend(struct.pack(f'{bo}H', 42))
out.extend(struct.pack(f'{bo}I', ifd_start))
out.extend(struct.pack(f'{bo}H', n))
for tag, typ, count, raw in tag_list:
out.extend(struct.pack(f'{bo}HHI', tag, typ, count))
if len(raw) <= 4:
out.extend(raw.ljust(4, b'\x00'))
else:
out.extend(struct.pack(f'{bo}I', overflow_start + tag_offsets[tag]))
out.extend(struct.pack(f'{bo}I', 0))
out.extend(overflow_buf)
out.extend(pixels)
Path(path).write_bytes(bytes(out))
gkd = [1, 1, 0, 1, 1026, 34737, len(wkt) + 1, 0]
write_minimal_tiff(path, geokeys=gkd, geo_ascii=wkt)


def _write_rotated_vrt(
Expand Down
Loading