Source code for pys5p.ocm_io

#
# This file is part of pyS5p
#
# https://github.com/rmvanhees/pys5p.git
#
# Copyright (c) 2017-2022 SRON - Netherlands Institute for Space Research
#   All Rights Reserved
#
# License:  BSD-3-Clause
"""This module contain class `OCMio` to access on-ground calibration data."""

from __future__ import annotations

__all__ = ['OCMio']

from datetime import datetime, timedelta
from pathlib import Path, PurePosixPath
from typing import Any

import h5py
import numpy as np
from moniplot.biweight import Biweight

# - global parameters ------------------------------


# - local functions --------------------------------
def band2channel(dict_a: dict, dict_b: dict,
                 mode: bool = None) -> np.ndarray | tuple[Any, Any]:
    """Store data from a dictionary as returned by get_msm_data to a ndarray.

    Parameters
    ----------
    dict_a      :  dict
       data of the one spectral band
    dict_b      :  dict
       data of another spectral band
    mode        :  list ['combined', 'mean', 'median', 'biweight']
        'combined'
             will combine data using np.concatenate((data_a, data_b),\
             axis=data_a.ndim-1)

        'mean'
             is calculated using np.nanmean(data, axis=0)

        'median'
             is calculated using np.nanmedian(data, axis=0)

        'biweight'
             is calculated using biweight(data, axis=0)

    Returns
    -------
    numpy.ndarray
        Data from dictionary stored in a numpy array

    Examples
    --------
    > data = ocm.band2channel(dict_a, dict_b, mode=['combined', 'median'])
    """
    if mode is None:
        mode = []

    data_a = None
    for key in sorted(dict_a):
        buff = dict_a[key][...]

        data_a = buff if data_a is None else np.vstack((data_a, buff))

    if data_a is not None:
        if 'mean' in mode:
            data_a = np.nanmean(data_a, axis=0)
        elif 'median' in mode:
            data_a = np.nanmedian(data_a, axis=0)
        elif 'biweight' in mode:
            data_a = Biweight(data_a, axis=0).median

    if dict_b is None:
        return data_a

    data_b = None
    for key in sorted(dict_b):
        buff = dict_b[key][...]

        data_b = buff if data_b is None else np.vstack((data_b, buff))

    if data_b is not None:
        if 'mean' in mode:
            data_b = np.nanmean(data_b, axis=0)
        elif 'median' in mode:
            data_b = np.nanmedian(data_b, axis=0)
        elif 'biweight' in mode:
            data_b = Biweight(data_b, axis=0).median

    if 'combined' in mode:
        return np.concatenate((data_a, data_b), axis=data_a.ndim-1)

    return data_a, data_b


# - class definition -------------------------------
[docs] class OCMio: """This class should offer all the necessary functionality to read Tropomi on-ground calibration products (Lx). Parameters ---------- ocm_product : Path Full path to on-ground calibration measurement """ def __init__(self, ocm_product: Path): """Initialize access to an OCAL Lx product.""" if not ocm_product.is_file(): raise FileNotFoundError(f'{ocm_product.name} does not exist') # initialize class-attributes self.__msm_path = None self.band = None self.filename = ocm_product # open OCM product as HDF5 file self.fid = h5py.File(ocm_product, 'r') def __iter__(self): """Allow iteration.""" for attr in sorted(self.__dict__): if not attr.startswith('__'): yield attr # def __del__(self): # """ # called when the object is destroyed # """ # self.close() def __enter__(self): """Initiate the context manager.""" return self def __exit__(self, exc_type, exc_value, traceback): """Exit the context manager.""" self.close() return False # any exception is raised by the with statement.
[docs] def close(self): """Close resources.""" self.band = None if self.fid is not None: self.fid.close() self.fid = None
# ---------- RETURN VERSION of the S/W ---------- # ---------- Functions that work before MSM selection ----------
[docs] def get_processor_version(self) -> str: """Return version of the L01b processor.""" res = self.fid.attrs['processor_version'] if isinstance(res, bytes): # pylint: disable=no-member res = res.decode('ascii') return res
[docs] def get_coverage_time(self) -> tuple[str, str]: """Return start and end of the measurement coverage time.""" t_bgn = self.fid.attrs['time_coverage_start'] if isinstance(t_bgn, bytes): # pylint: disable=no-member t_bgn = t_bgn.decode('ascii') t_end = self.fid.attrs['time_coverage_end'] if isinstance(t_end, bytes): # pylint: disable=no-member t_end = t_end.decode('ascii') return t_bgn, t_end
[docs] def get_attr(self, attr_name) -> np.ndarray | None: """Obtain value of an HDF5 file attribute. Parameters ---------- attr_name : string name of the attribute """ if attr_name in self.fid.attrs: return self.fid.attrs[attr_name] return None
# ---------- Functions that only work after MSM selection ----------
[docs] def get_ref_time(self) -> np.ndarray | None: """Return reference start time of measurements.""" if not self.__msm_path: return {} grp = self.fid[f'BAND{self.band}'] res = {} for msm in sorted(self.__msm_path): sgrp = grp[str(PurePosixPath(msm, 'GEODATA'))] res[msm] = datetime(2010, 1, 1, 0, 0, 0) res[msm] += timedelta(seconds=int(sgrp['time'][0])) return res
[docs] def get_delta_time(self) -> np.ndarray | None: """Return offset from the reference start time of measurement.""" if not self.__msm_path: return {} grp = self.fid[f'BAND{self.band}'] res = {} for msm in sorted(self.__msm_path): sgrp = grp[str(PurePosixPath(msm, 'GEODATA'))] res[msm] = sgrp['delta_time'][:].astype(int) return res
[docs] def get_instrument_settings(self) -> np.ndarray | None: """Return instrument settings of measurement.""" if not self.__msm_path: return {} grp = self.fid[f'BAND{self.band}'] res = {} for msm in sorted(self.__msm_path): sgrp = grp[str(PurePosixPath(msm, 'INSTRUMENT'))] res[msm] = np.squeeze(sgrp['instrument_settings']) return res
[docs] def get_gse_stimuli(self) -> np.ndarray | None: """Return GSE stimuli parameters.""" if not self.__msm_path: return {} grp = self.fid[f'BAND{self.band}'] res = {} for msm in sorted(self.__msm_path): sgrp = grp[str(PurePosixPath(msm, 'INSTRUMENT'))] res[msm] = np.squeeze(sgrp['gse_stimuli']) return res
[docs] def get_exposure_time(self) -> np.ndarray | None: """Return the exact pixel exposure time of the measurements.""" if not self.__msm_path: return None grp = self.fid[f'BAND{self.band}'] msm = self.__msm_path[0] # all measurement sets have the same ICID sgrp = grp[str(PurePosixPath(msm, 'INSTRUMENT'))] instr = np.squeeze(sgrp['instrument_settings']) if int(self.band) > 6: return 1.25e-6 * (65540 - instr['int_delay'] + instr['int_hold']) return instr['exposure_time']
[docs] def get_housekeeping_data(self) -> np.ndarray | None: """Return housekeeping data of measurements.""" if not self.__msm_path: return {} grp = self.fid[f'BAND{self.band}'] res = {} for msm in sorted(self.__msm_path): sgrp = grp[str(PurePosixPath(msm, 'INSTRUMENT'))] res[msm] = np.squeeze(sgrp['housekeeping_data']) return res
# -------------------------
[docs] def select(self, ic_id: int = None, *, msm_grp: str | None = None) -> int: """Select a measurement as BAND%/ICID_<ic_id>_GROUP_%. Parameters ---------- ic_id : int used as "BAND%/ICID_{}_GROUP_%".format(ic_id) msm_grp : str select measurement group with name msm_grp All measurements groups are shown when ic_id and msm_grp are None Returns ------- scalar Number of measurements found Notes ----- Updated object attributes: - bands : available spectral bands """ self.band = '' self.__msm_path = [] for ii in '87654321': if f'BAND{ii}' in self.fid: self.band = ii break if self.band: gid = self.fid[f'BAND{self.band}'] if msm_grp is not None and msm_grp in gid: self.__msm_path = [msm_grp] elif ic_id is None: grp_name = 'ICID_' for kk in gid: if kk.startswith(grp_name): print(kk) else: grp_name = f'ICID_{ic_id:05}_GROUP' self.__msm_path = [s for s in gid if s.startswith(grp_name)] return len(self.__msm_path)
# -------------------------
[docs] def get_msm_attr(self, msm_dset: str, attr_name: str) -> str | None: """Return attribute of measurement dataset "msm_dset". Parameters ---------- msm_dset : str name of measurement dataset attr_name : str name of the attribute Returns ------- scalar or numpy.ndarray value of attribute "attr_name" """ if not self.__msm_path: return '' grp = self.fid[f'BAND{self.band}'] for msm_path in self.__msm_path: ds_path = str(PurePosixPath(msm_path, 'OBSERVATIONS', msm_dset)) if attr_name in grp[ds_path].attrs: attr = grp[ds_path].attrs[attr_name] if isinstance(attr, bytes): return attr.decode('ascii') return attr return None
# -------------------------
[docs] def get_msm_data(self, msm_dset: str, fill_as_nan: bool = True, frames: list[int, int] | None = None, columns: list[int, int] | None = None) -> dict: """Return data of measurement dataset `msm_dset`. Parameters ---------- msm_dset : str name of measurement dataset if msm_dset is None then show names of available datasets columns : [i, j] Slice data on fastest axis (columns) as, from index 'i' to 'j' frames : [i, j] Slice data on slowest axis (time) as, from index 'i' to 'j' fill_as_nan : boolean replace (float) FillValues with Nan's Returns ------- dict Python dictionary with names of msm_groups as keys """ fillvalue = float.fromhex('0x1.ep+122') if not self.__msm_path: return {} # show HDF5 dataset names and return grp = self.fid[f'BAND{self.band}'] if msm_dset is None: ds_path = str(PurePosixPath(self.__msm_path[0], 'OBSERVATIONS')) for kk in grp[ds_path]: print(kk) return {} # skip row257 from the SWIR detector rows = None if self.band in ('7', '8'): rows = [0, -1] # combine data of all measurement groups in dictionary res = {} for msm_grp in sorted(self.__msm_path): dset = grp[str(PurePosixPath(msm_grp, 'OBSERVATIONS', msm_dset))] data_sel = () for ii in range(dset.ndim): dim_name = PurePosixPath(dset.dims[ii][0].name).name if dim_name == 'msmt_time': if frames is None: data_sel += (slice(None),) else: data_sel += (slice(*frames),) elif dim_name == 'row': if rows is None: data_sel += (slice(None),) else: data_sel += (slice(*rows),) elif dim_name == 'column': if columns is None: data_sel += (slice(None),) else: data_sel += (slice(*columns),) else: raise ValueError # read data if dset.dtype == np.float32: data = np.squeeze(dset.astype(float)[data_sel]) else: data = np.squeeze(dset[data_sel]) if fill_as_nan and dset.attrs['_FillValue'] == fillvalue: data[(data == fillvalue)] = np.nan # add data to dictionary res[msm_grp] = data return res
# -------------------------
[docs] def read_direct_msm(self, msm_dset: str, dest_sel: tuple[slice | int] | None = None, dest_dtype: type[Any] | None = None, fill_as_nan: bool = False) -> dict | None: """Return data of measurement dataset `msm_dset` (fast implementation). Parameters ---------- msm_dset : string Name of measurement dataset dest_sel : numpy slice Selection must be the output of numpy.s_[<args>]. dest_dtype : numpy dtype Perform type conversion fill_as_nan : boolean Replace (float) FillValues with Nan's, when True Returns ------- dict Python dictionary with names of msm_groups as keys """ fillvalue = float.fromhex('0x1.ep+122') if not self.__msm_path: return None if dest_sel is None: dest_sel = np.s_[...] # combine data of all measurement groups in dictionary res = {} for msm_grp in sorted(self.__msm_path): dset = self.fid[str(PurePosixPath( f'BAND{self.band}', msm_grp, 'OBSERVATIONS', msm_dset))] if dest_dtype is None: buff = dset[dest_sel] else: buff = dset.astype(dest_dtype)[dest_sel] if fill_as_nan and dset.attrs['_FillValue'] == fillvalue: buff[(buff == fillvalue)] = np.nan # add data to dictionary res[msm_grp] = buff return res