Source code for ssapy_toolkit.IO.dict_to_from_hdf5

"""
Utilities for saving/loading nested Python dictionaries to/from HDF5.

Public API:
    save_dict_to_hdf5(filename, data, ...)
    load_dict_from_hdf5(filename)
"""

import os
import pickle
import datetime as _dt
from typing import Any, Mapping, Union

import h5py
import numpy as np

try:
    from astropy.time import Time as AstroTime
    _HAS_ASTROPY = True
except ImportError:
    _HAS_ASTROPY = False


# ---------- SAVE ----------


[docs] def save_dict_to_hdf5( filename: str, data: Mapping[str, Any], mode: str = "w", *, pickle_objects: bool = True, compression: Union[str, None] = "gzip", compression_opts: Union[int, None] = 4, ) -> None: """ Save a (possibly nested) Python dictionary to an HDF5 file. Signature: save_dict_to_hdf5(filename, data, ...) Supported types: - dict -> HDF5 group - list/tuple -> HDF5 group with numeric keys ("0", "1", ...) - numpy arrays -> datasets (with compression, if requested) - scalars: int/float/bool, numpy scalar types -> scalar datasets (no compression) - str -> variable-length UTF-8 datasets - bytes/bytearray/memoryview -> bytes datasets - datetime.datetime/date/time -> stored as ISO strings - astropy.time.Time -> stored as (mjd, scale, format, meta) - other objects -> pickled if pickle_objects=True """ def _store_string_with_type( h5group: h5py.Group, key: str, value: str, type_name: str, ) -> None: if key in h5group: del h5group[key] dt = h5py.string_dtype(encoding="utf-8") ds = h5group.create_dataset( key, data=np.array(value, dtype=dt), dtype=dt, ) ds.attrs["__type__"] = type_name def _write_item( h5group: h5py.Group, key: str, value: Any, ) -> None: # nested dict -> subgroup if isinstance(value, Mapping): if key in h5group and isinstance(h5group[key], h5py.Group): subgroup = h5group[key] # clear any old sequence flag subgroup.attrs["__is_sequence__"] = False else: subgroup = h5group.require_group(key) subgroup.attrs["__is_sequence__"] = False for k, v in value.items(): if not isinstance(k, str): raise TypeError(f"HDF5 requires string keys; got key={k!r}") _write_item(subgroup, k, v) # list/tuple -> group with numeric keys elif isinstance(value, (list, tuple)): if key in h5group and isinstance(h5group[key], h5py.Group): subgroup = h5group[key] subgroup.attrs["__is_sequence__"] = True else: subgroup = h5group.require_group(key) subgroup.attrs["__is_sequence__"] = True # remove any existing children (in case of overwrite) for child in list(subgroup.keys()): del subgroup[child] for idx, v in enumerate(value): _write_item(subgroup, str(idx), v) # astropy Time elif _HAS_ASTROPY and isinstance(value, AstroTime): if key in h5group: del h5group[key] t_group = h5group.create_group(key) mjd_arr = np.array(value.mjd, dtype="float64") t_group.create_dataset("mjd", data=mjd_arr) t_group.attrs["__type__"] = "astropy.time.Time" t_group.attrs["scale"] = value.scale t_group.attrs["format"] = "mjd" if getattr(value, "meta", None): meta_pickled = pickle.dumps(dict(value.meta), protocol=pickle.HIGHEST_PROTOCOL) meta_ds = t_group.create_dataset( "meta", data=np.frombuffer(meta_pickled, dtype="uint8"), ) meta_ds.attrs["pickled"] = True # datetime types -> ISO strings with type attrs elif isinstance(value, _dt.datetime): _store_string_with_type(h5group, key, value.isoformat(), "datetime.datetime") elif isinstance(value, _dt.date): _store_string_with_type(h5group, key, value.isoformat(), "datetime.date") elif isinstance(value, _dt.time): _store_string_with_type(h5group, key, value.isoformat(), "datetime.time") # numpy array elif isinstance(value, np.ndarray): if key in h5group: del h5group[key] if value.shape == (): # scalar np.ndarray: no compression h5group.create_dataset(key, data=value) else: h5group.create_dataset( key, data=value, compression=compression, compression_opts=compression_opts, ) # numeric scalars elif isinstance(value, (int, float, bool, np.integer, np.floating, np.bool_)): if key in h5group: del h5group[key] h5group.create_dataset(key, data=value) # strings -> vlen UTF-8 elif isinstance(value, str): if key in h5group: del h5group[key] dt = h5py.string_dtype(encoding="utf-8") h5group.create_dataset( key, data=np.array(value, dtype=dt), dtype=dt, ) # bytes-like elif isinstance(value, (bytes, bytearray, memoryview)): if key in h5group: del h5group[key] arr = np.frombuffer(bytes(value), dtype="uint8") ds = h5group.create_dataset(key, data=arr) ds.attrs["__bytes__"] = True # fallback: pickle else: if not pickle_objects: raise TypeError( f"Unsupported type for key '{key}': {type(value)!r}. " f"Enable pickle_objects=True to store via pickle." ) if key in h5group: del h5group[key] pickled = pickle.dumps(value, protocol=pickle.HIGHEST_PROTOCOL) ds = h5group.create_dataset( key, data=np.frombuffer(pickled, dtype="uint8"), ) ds.attrs["pickled"] = True ds.attrs["python_type"] = str(type(value)) ds.attrs["__type__"] = "pickle" os.makedirs(os.path.dirname(os.path.abspath(filename)), exist_ok=True) with h5py.File(filename, mode) as f: for k, v in data.items(): if not isinstance(k, str): raise TypeError(f"HDF5 requires string keys; got key={k!r}") _write_item(f, k, v)
# ---------- LOAD ----------
[docs] def load_dict_from_hdf5(filename: str) -> dict: """ Load a dictionary previously stored with save_dict_to_hdf5(filename, data, ...). Behavior: - Groups marked with __is_sequence__=True are reconstructed as lists, even if empty. - Groups marked with __is_sequence__=False are reconstructed as dicts, even if empty. - Groups without the attribute: * if keys are 0..n-1 as strings, treated as lists * otherwise, treated as dicts. - Supported special types: astropy.time.Time, datetime.*, bytes, pickled objects. """ def _read_item(obj: Union[h5py.Group, h5py.Dataset]) -> Any: # Group if isinstance(obj, h5py.Group): # astropy Time group? if "__type__" in obj.attrs and obj.attrs["__type__"] == "astropy.time.Time": if not _HAS_ASTROPY: raise ImportError( "astropy is required to load astropy.time.Time objects." ) mjd = np.array(obj["mjd"][...], dtype="float64") scale = obj.attrs["scale"] t = AstroTime(mjd, format="mjd", scale=scale) if "meta" in obj: meta_ds = obj["meta"] if meta_ds.attrs.get("pickled", False): meta_bytes = bytes(meta_ds[...].tolist()) meta = pickle.loads(meta_bytes) t.meta.update(meta) return t # Explicit sequence or mapping marker if "__is_sequence__" in obj.attrs: if obj.attrs["__is_sequence__"]: # list-like keys = list(obj.keys()) if not keys: return [] int_keys = sorted(int(k) for k in keys) return [_read_item(obj[str(i)]) for i in int_keys] else: # dict-like out = {} for k in obj.keys(): out[k] = _read_item(obj[k]) return out # No explicit marker: infer from keys keys = list(obj.keys()) if not keys: # no marker, no children: default to dict return {} try: int_keys = sorted(int(k) for k in keys) is_seq = int_keys == list(range(len(keys))) except ValueError: is_seq = False if is_seq: return [_read_item(obj[str(i)]) for i in range(len(keys))] else: out = {} for k in keys: out[k] = _read_item(obj[k]) return out # Dataset ds: h5py.Dataset = obj # type: ignore[assignment] # pickled object? if ds.attrs.get("__type__") == "pickle" or ds.attrs.get("pickled", False): arr = np.array(ds[...], dtype="uint8") return pickle.loads(arr.tobytes()) # bytes dataset? if ds.attrs.get("__bytes__", False): arr = np.array(ds[...], dtype="uint8") return bytes(arr.tobytes()) # type-tagged strings (datetime, etc.) tname = ds.attrs.get("__type__", None) if tname: value = ds.asstr()[()] if isinstance(value, np.ndarray): value = value.tolist() if tname == "datetime.datetime": return _dt.datetime.fromisoformat(value) elif tname == "datetime.date": return _dt.date.fromisoformat(value) elif tname == "datetime.time": return _dt.time.fromisoformat(value) # plain string dataset if h5py.check_string_dtype(ds.dtype) is not None: return ds.asstr()[()] # numeric / array dataset arr = ds[...] if arr.shape == (): return arr[()] # numpy scalar return arr with h5py.File(filename, "r") as f: result = {} for k in f.keys(): result[k] = _read_item(f[k]) return result