# This file is part of h5py, a Python interface to the HDF5 library. # # http://www.h5py.org # # Copyright 2008-2013 Andrew Collette and contributors # # License: Standard 3-clause BSD; see "license.txt" for full license terms # and contributor agreement. """ Dataset testing operations. Tests all dataset operations, including creation, with the exception of: 1. Slicing operations for read and write, handled by module test_slicing 2. Type conversion for read and write (currently untested) """ import pathlib import os import sys import numpy as np import platform import pytest import warnings from .common import ut, TestCase from .data_files import get_data_file_path from h5py import File, Group, Dataset from h5py._hl.base import is_empty_dataspace, product from h5py import h5f, h5t from h5py.h5py_warnings import H5pyDeprecationWarning from h5py import version import h5py import h5py._hl.selections as sel from h5py.tests.common import NUMPY_RELEASE_VERSION class BaseDataset(TestCase): def setUp(self): self.f = File(self.mktemp(), 'w') def tearDown(self): if self.f: self.f.close() class TestRepr(BaseDataset): """ Feature: repr(Dataset) behaves sensibly """ def test_repr_basic(self): ds = self.f.create_dataset('foo', (4,), dtype='int32') assert repr(ds) == '' def test_repr_closed(self): """ repr() works on live and dead datasets """ ds = self.f.create_dataset('foo', (4,)) self.f.close() assert repr(ds) == '' def test_repr_anonymous(self): ds = self.f.create_dataset(None, (4,), dtype='int32') assert repr(ds) == '' class TestCreateShape(BaseDataset): """ Feature: Datasets can be created from a shape only """ def test_create_scalar(self): """ Create a scalar dataset """ dset = self.f.create_dataset('foo', ()) self.assertEqual(dset.shape, ()) def test_create_simple(self): """ Create a size-1 dataset """ dset = self.f.create_dataset('foo', (1,)) self.assertEqual(dset.shape, (1,)) def test_create_integer(self): """ Create a size-1 dataset with integer shape""" dset = self.f.create_dataset('foo', 1) self.assertEqual(dset.shape, (1,)) def test_create_extended(self): """ Create an extended dataset """ dset = self.f.create_dataset('foo', (63,)) self.assertEqual(dset.shape, (63,)) self.assertEqual(dset.size, 63) dset = self.f.create_dataset('bar', (6, 10)) self.assertEqual(dset.shape, (6, 10)) self.assertEqual(dset.size, (60)) def test_create_integer_extended(self): """ Create an extended dataset """ dset = self.f.create_dataset('foo', 63) self.assertEqual(dset.shape, (63,)) self.assertEqual(dset.size, 63) dset = self.f.create_dataset('bar', (6, 10)) self.assertEqual(dset.shape, (6, 10)) self.assertEqual(dset.size, (60)) def test_default_dtype(self): """ Confirm that the default dtype is float """ dset = self.f.create_dataset('foo', (63,)) self.assertEqual(dset.dtype, np.dtype('=f4')) def test_missing_shape(self): """ Missing shape raises TypeError """ with self.assertRaises(TypeError): self.f.create_dataset('foo') def test_long_double(self): """ Confirm that the default dtype is float """ dset = self.f.create_dataset('foo', (63,), dtype=np.longdouble) if platform.machine() in ['ppc64le']: pytest.xfail("Storage of long double deactivated on %s" % platform.machine()) self.assertEqual(dset.dtype, np.longdouble) @ut.skipIf(not hasattr(np, "complex256"), "No support for complex256") def test_complex256(self): """ Confirm that the default dtype is float """ dset = self.f.create_dataset('foo', (63,), dtype=np.dtype('complex256')) self.assertEqual(dset.dtype, np.dtype('complex256')) def test_name_bytes(self): dset = self.f.create_dataset(b'foo', (1,)) self.assertEqual(dset.shape, (1,)) dset2 = self.f.create_dataset(b'bar/baz', (2,)) self.assertEqual(dset2.shape, (2,)) class TestCreateData(BaseDataset): """ Feature: Datasets can be created from existing data """ def test_create_scalar(self): """ Create a scalar dataset from existing array """ data = np.ones((), 'f') dset = self.f.create_dataset('foo', data=data) self.assertEqual(dset.shape, data.shape) def test_create_extended(self): """ Create an extended dataset from existing data """ data = np.ones((63,), 'f') dset = self.f.create_dataset('foo', data=data) self.assertEqual(dset.shape, data.shape) def test_dataset_intermediate_group(self): """ Create dataset with missing intermediate groups """ ds = self.f.create_dataset("/foo/bar/baz", shape=(10, 10), dtype=' 0 si = ds.get_chunk_info_by_coord((0, 0)) assert si.chunk_offset == (0, 0) assert si.filter_mask == 0 assert si.byte_offset is not None assert si.size > 0 @ut.skipUnless(h5py.version.hdf5_version_tuple >= (1, 12, 3) or (h5py.version.hdf5_version_tuple >= (1, 10, 10) and h5py.version.hdf5_version_tuple < (1, 10, 99)), "chunk iteration requires HDF5 1.10.10 and later 1.10, or 1.12.3 and later") def test_chunk_iter(): """H5Dchunk_iter() for chunk information""" from io import BytesIO buf = BytesIO() with h5py.File(buf, 'w') as f: f.create_dataset('test', shape=(100, 100), chunks=(10, 10), dtype='i4') f['test'][:] = 1 buf.seek(0) with h5py.File(buf, 'r') as f: dsid = f['test'].id num_chunks = dsid.get_num_chunks() assert num_chunks == 100 ci = {} for j in range(num_chunks): si = dsid.get_chunk_info(j) ci[si.chunk_offset] = si def callback(chunk_info): known = ci[chunk_info.chunk_offset] assert chunk_info.chunk_offset == known.chunk_offset assert chunk_info.filter_mask == known.filter_mask assert chunk_info.byte_offset == known.byte_offset assert chunk_info.size == known.size dsid.chunk_iter(callback) def test_empty_shape(writable_file): ds = writable_file.create_dataset('empty', dtype='int32') assert ds.shape is None assert ds.maxshape is None def test_zero_storage_size(): # https://github.com/h5py/h5py/issues/1475 from io import BytesIO buf = BytesIO() with h5py.File(buf, 'w') as fout: fout.create_dataset('empty', dtype='uint8') buf.seek(0) with h5py.File(buf, 'r') as fin: assert fin['empty'].chunks is None assert fin['empty'].id.get_offset() is None assert fin['empty'].id.get_storage_size() == 0 def test_python_int_uint64(writable_file): # https://github.com/h5py/h5py/issues/1547 data = [np.iinfo(np.int64).max, np.iinfo(np.int64).max + 1] # Check creating a new dataset ds = writable_file.create_dataset('x', data=data, dtype=np.uint64) assert ds.dtype == np.dtype(np.uint64) np.testing.assert_array_equal(ds[:], np.array(data, dtype=np.uint64)) # Check writing to an existing dataset ds[:] = data np.testing.assert_array_equal(ds[:], np.array(data, dtype=np.uint64)) def test_setitem_fancy_indexing(writable_file): # https://github.com/h5py/h5py/issues/1593 arr = writable_file.create_dataset('data', (5, 1000, 2), dtype=np.uint8) block = np.random.randint(255, size=(5, 3, 2)) arr[:, [0, 2, 4], ...] = block def test_vlen_spacepad(): with File(get_data_file_path("vlen_string_dset.h5")) as f: assert f["DS1"][0] == b"Parting" def test_vlen_nullterm(): with File(get_data_file_path("vlen_string_dset_utc.h5")) as f: assert f["ds1"][0] == b"2009-12-20T10:16:18.662409Z" def test_allow_unknown_filter(writable_file): # apparently 256-511 are reserved for testing purposes fake_filter_id = 256 ds = writable_file.create_dataset( 'data', shape=(10, 10), dtype=np.uint8, compression=fake_filter_id, allow_unknown_filter=True ) assert str(fake_filter_id) in ds._filters def test_dset_chunk_cache(): """Chunk cache configuration for individual datasets.""" from io import BytesIO buf = BytesIO() with h5py.File(buf, 'w') as fout: ds = fout.create_dataset( 'x', shape=(10, 20), chunks=(5, 4), dtype='i4', rdcc_nbytes=2 * 1024 * 1024, rdcc_w0=0.2, rdcc_nslots=997) ds_chunk_cache = ds.id.get_access_plist().get_chunk_cache() assert fout.id.get_access_plist().get_cache()[1:] != ds_chunk_cache assert ds_chunk_cache == (997, 2 * 1024 * 1024, 0.2) buf.seek(0) with h5py.File(buf, 'r') as fin: ds = fin.require_dataset( 'x', shape=(10, 20), dtype='i4', rdcc_nbytes=3 * 1024 * 1024, rdcc_w0=0.67, rdcc_nslots=709) ds_chunk_cache = ds.id.get_access_plist().get_chunk_cache() assert fin.id.get_access_plist().get_cache()[1:] != ds_chunk_cache assert ds_chunk_cache == (709, 3 * 1024 * 1024, 0.67) class TestCommutative(BaseDataset): """ Test the symmetry of operators, at least with the numpy types. Issue: https://github.com/h5py/h5py/issues/1947 """ def test_numpy_commutative(self,): """ Create a h5py dataset, extract one element convert to numpy Check that it returns symmetric response to == and != """ shape = (100,1) dset = self.f.create_dataset("test", shape, dtype=float, data=np.random.rand(*shape)) # grab a value from the elements, ie dset[0, 0] # check that mask arrays are commutative wrt ==, != val = np.float64(dset[0, 0]) assert np.all((val == dset) == (dset == val)) assert np.all((val != dset) == (dset != val)) # generate sample not in the dset, ie max(dset)+delta # check that mask arrays are commutative wrt ==, != delta = 0.001 nval = np.nanmax(dset)+delta assert np.all((nval == dset) == (dset == nval)) assert np.all((nval != dset) == (dset != nval)) def test_basetype_commutative(self,): """ Create a h5py dataset and check basetype compatibility. Check that operation is symmetric, even if it is potentially not meaningful. """ shape = (100,1) dset = self.f.create_dataset("test", shape, dtype=float, data=np.random.rand(*shape)) # generate float type, sample float(0.) # check that operation is symmetric (but potentially meaningless) val = float(0.) assert (val == dset) == (dset == val) assert (val != dset) == (dset != val) class TestVirtualPrefix(BaseDataset): """ Test setting virtual prefix """ def test_virtual_prefix_create(self): shape = (100,1) virtual_prefix = "/path/to/virtual" dset = self.f.create_dataset("test", shape, dtype=float, data=np.random.rand(*shape), virtual_prefix = virtual_prefix) virtual_prefix_readback = pathlib.Path(dset.id.get_access_plist().get_virtual_prefix().decode()).as_posix() assert virtual_prefix_readback == virtual_prefix def test_virtual_prefix_require(self): virtual_prefix = "/path/to/virtual" dset = self.f.require_dataset('foo', (10, 3), 'f', virtual_prefix = virtual_prefix) virtual_prefix_readback = pathlib.Path(dset.id.get_access_plist().get_virtual_prefix().decode()).as_posix() self.assertEqual(virtual_prefix, virtual_prefix_readback) self.assertIsInstance(dset, Dataset) self.assertEqual(dset.shape, (10, 3)) def ds_str(file, shape=(10, )): dt = h5py.string_dtype(encoding='ascii') fill_value = b'fill' return file.create_dataset('x', shape, dtype=dt, fillvalue=fill_value) def ds_fields(file, shape=(10, )): dt = np.dtype([ ('foo', h5py.string_dtype(encoding='ascii')), ('bar', np.float64), ]) fill_value = np.asarray(('fill', 0.0), dtype=dt) file['x'] = np.broadcast_to(fill_value, shape) return file['x'] view_getters = pytest.mark.parametrize( "view_getter,make_ds", [ (lambda ds: ds, ds_str), (lambda ds: ds.astype(dtype=object), ds_str), (lambda ds: ds.asstr(), ds_str), (lambda ds: ds.fields("foo"), ds_fields), ], ids=["ds", "astype", "asstr", "fields"], ) COPY_IF_NEEDED = False if NUMPY_RELEASE_VERSION < (2, 0) else None @pytest.mark.parametrize("copy", [True, COPY_IF_NEEDED]) @view_getters def test_array_copy(view_getter, make_ds, copy, writable_file): ds = make_ds(writable_file) view = view_getter(ds) np.array(view, copy=copy) @pytest.mark.skipif( NUMPY_RELEASE_VERSION < (2, 0), reason="forbidding copies requires numpy 2", ) @view_getters def test_array_copy_false(view_getter, make_ds, writable_file): ds = make_ds(writable_file) view = view_getter(ds) with pytest.raises(ValueError, match="memory allocation cannot be avoided"): np.array(view, copy=False) @view_getters def test_array_dtype(view_getter, make_ds, writable_file): ds = make_ds(writable_file) view = view_getter(ds) assert np.array(view, dtype='|S10').dtype == np.dtype('|S10') @view_getters def test_array_scalar(view_getter, make_ds, writable_file): ds = make_ds(writable_file, shape=()) view = view_getter(ds) assert isinstance(view[()], (bytes, str)) assert np.array(view).shape == () @view_getters def test_array_nd(view_getter, make_ds, writable_file): ds = make_ds(writable_file, shape=(5, 6)) view = view_getter(ds) assert np.array(view).shape == (5, 6) @view_getters def test_view_properties(view_getter, make_ds, writable_file): ds = make_ds(writable_file, shape=(5, 6)) view = view_getter(ds) assert view.dtype == np.dtype(object) assert view.ndim == 2 assert view.shape == (5, 6) assert view.size == 30 assert len(view) == 5