#
# This file is part of pyS5p
#
# https://github.com/rmvanhees/pys5p.git
#
# Copyright (c) 2017-2022 SRON - Netherlands Institute for Space Research
# All Rights Reserved
#
# License: BSD-3-Clause
"""This module contains the class `CKDio`."""
from __future__ import annotations
__all__ = ['CKDio']
from pathlib import Path, PosixPath
import h5py
import numpy as np
import xarray as xr
from moniplot.image_to_xarray import h5_to_xr
# - local functions ------------------------------
def reject_row257(xarr: xr.DataArray | xr.Dataset) -> xr.DataArray | xr.Dataset:
"""Remove row 257 from DataArray or Dataset."""
return xarr.isel(row=np.s_[0:256])
# - class definition -------------------------------
[docs]
class CKDio:
"""
Read Tropomi CKD from the Static CKD product or from dynamic CKD products.
Parameters
----------
ckd_dir : Path, default=Path('/nfs/Tropomi/share/ckd')
Directory where the CKD files are stored
ckd_version : int, default=1
Version of the CKD
ckd_file : str, optional
Name of the CKD file, default=None then the CKD file is searched
in the directory ckd_dir with ckd_version in the glob-string
Notes
-----
Not all CKD are defined or derived for all bands.
You can request a CKD for one band or for a channel (bands: '12', '34',
'56', '78'). Do not mix bands from different channels
The option to have dynamic CKD is not used for the Tropomi mission, only
for S/W version 1 a dynamic CKD product is defined. This product contained
the OCAL CKD and was not updated automatically. For version 2, all CKD are
stored in one product, where some CKD have a time-axis to correct any
in-flight degradation.
Therefore, the logic to find a CKD is implemented as follows:
1) ckd_dir, defines the base directory to search for the CKD products
(see below).
2) ckd_file, defines the full path to (static) CKD product;
(version 1) any product with dynamic CKD has to be in the same
directory.
Version 1:
* Static CKD are stored in one file: glob('*_AUX_L1_CKD_*')
* Dynamic CKD are stored in two files:
- UVN, use glob('*_ICM_CKDUVN_*')
- SWIR, use glob('*_ICM_CKDSIR_*')
Version 2+:
* All CKD in one file: glob('*_AUX_L1_CKD_*')
* Dynamic CKD are empty
"""
def __init__(self, ckd_dir: Path | None = None, ckd_version: int = 1,
ckd_file: Path | None = None) -> None:
"""Create CKDio object."""
if ckd_dir is None:
ckd_dir = Path('/nfs/Tropomi/share/ckd')
self.ckd_version = max(1, ckd_version)
self.ckd_dyn_file = None
# define path to CKD product
if ckd_file is None:
if not ckd_dir.is_dir():
raise FileNotFoundError(f'Not found CKD directory: {ckd_dir.name}')
self.ckd_dir = ckd_dir
glob_str = f'*_AUX_L1_CKD_*_*_00000_{self.ckd_version:02d}_*_*.h5'
if (self.ckd_dir / 'static').is_dir():
res = sorted((self.ckd_dir / 'static').glob(glob_str))
else:
res = sorted(self.ckd_dir.glob(glob_str))
if not res:
raise FileNotFoundError('Static CKD product not found')
self.ckd_file = res[-1]
else:
if not ckd_file.is_file():
raise FileNotFoundError(f'Not found CKD file: {ckd_file.name}')
self.ckd_dir = ckd_file.parent
self.ckd_file = ckd_file
# obtain path to dynamic CKD product (version 1, only)
if self.ckd_version == 1:
if (self.ckd_dir / 'dynamic').is_dir():
res = sorted((self.ckd_dir / 'dynamic').glob('*_ICM_CKDSIR_*'))
else:
res = sorted(self.ckd_dir.glob('*_ICM_CKDSIR_*'))
if res:
self.ckd_dyn_file = res[-1]
# open access to CKD product
self.fid = h5py.File(self.ckd_file, 'r')
def __enter__(self):
"""Initiate the context manager."""
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Exit the context manager."""
self.close()
return False # any exception is raised by the with statement.
[docs]
def close(self) -> None:
"""Make sure that we close all resources."""
if self.fid is not None:
self.fid.close()
[docs]
def creation_time(self) -> str:
"""Return datetime when the L1b product was created."""
if self.ckd_version == 2:
attr = self.fid['METADATA'].attrs['production_datetime']
else:
group = PosixPath('METADATA', 'earth_explorer_header',
'fixed_header', 'source')
attr = self.fid[str(group)].attrs['Creator_Date'][0]
if isinstance(attr, bytes):
attr = attr.decode('ascii')
return attr
[docs]
def creator_version(self) -> str:
"""Return version of Tropomi L01B processor."""
group = PosixPath('METADATA', 'earth_explorer_header', 'fixed_header')
attr = self.fid[str(group)].attrs['File_Version']
if self.ckd_version == 1:
attr = attr[0]
if isinstance(attr, bytes):
attr = attr.decode('ascii')
return attr
@staticmethod
def __get_spectral_channel(bands: str) -> str:
"""Check bands is valid: single band or belong to one channel.
Parameters
----------
bands : str
Tropomi bands [1..8] or channels ['12', '34', '56', '78'],
"""
band2channel = ['UNKNOWN', 'UV', 'UV', 'VIS', 'VIS',
'NIR', 'NIR', 'SWIR', 'SWIR']
if 0 < len(bands) > 2:
raise ValueError('read per band or channel, only')
if (len(bands) == 2
and band2channel[int(bands[0])] != band2channel[int(bands[1])]):
raise ValueError('bands should be of the same channel')
return band2channel[int(bands[0])]
[docs]
def get_param(self, ds_name: str,
band: str = '7') -> np.ndarray | float:
"""Return value(s) of a CKD parameter from the Static CKD product.
Parameters
----------
ds_name : str
Name of the HDF5 dataset, default='pixel_full_well'
band : str, default='7'
Band identifier '1', '2', ..., '8'
Returns
-------
numpy.ndarray or scalar
CKD parameter value
Notes
-----
Datasets of size=1 are return as scalar
Handy function for scalar HDF5 datasets, such as:
- dc_reference_temp
- dpqf_threshold
- pixel_full_well
- pixel_fw_flag_thresh
"""
if not 1 <= int(band) <= 8:
raise ValueError('band must be between and 1 and 8')
if ds_name not in self.fid[f'/BAND{band}']:
raise ValueError('dataset not available')
return self.fid[f'/BAND{band}/{ds_name}'][()]
# ---------- band or channel CKD's ----------
[docs]
def dn2v_factors(self) -> np.ndarray:
"""Return digital number to Volt CKD, SWIR only.
Notes
-----
The DN2V factor has no error attached to it.
"""
return np.concatenate(
(self.fid['/BAND7/dn2v_factor_swir'][2:],
self.fid['/BAND8/dn2v_factor_swir'][2:]))
[docs]
def v2c_factors(self) -> np.ndarray:
"""Return Voltage to Charge CKD, SWIR only.
Notes
-----
The V2C factor has no error attached to it.
"""
# pylint: disable=no-member
return np.concatenate(
(self.fid['/BAND7/v2c_factor_swir'].fields('value')[2:],
self.fid['/BAND8/v2c_factor_swir'].fields('value')[2:]))
# ---------- spectral-channel CKD's ----------
def __rd_dataset(self, dset_name: str, bands: str) -> xr.Dataset | None:
"""General function to read non-compound dataset into xarray::Dataset.
Parameters
----------
dset_name: str
name (including path) of the dataset as '/BAND{}/<name>'
bands : str
Tropomi bands [1..8] or channels ['12', '34', '56', '78'],
Returns
-------
xarray.Dataset
parameters of CKD with name 'dset_name'
"""
ckd_val = None
for band in bands:
# try Static-CKD product
if dset_name.format(band) in self.fid:
if ckd_val is None:
ckd_val = h5_to_xr(self.fid[dset_name.format(band)])
else:
ckd_val = xr.concat(
(ckd_val,
h5_to_xr(self.fid[dset_name.format(band)])),
dim='column')
# try Dynamic-CKD product
else:
dyn_fid = h5py.File(self.ckd_dyn_file, 'r')
if dset_name.format(band) in dyn_fid:
if ckd_val is None:
ckd_val = h5_to_xr(dyn_fid[dset_name.format(band)])
else:
ckd_val = xr.concat(
(ckd_val,
h5_to_xr(dyn_fid[dset_name.format(band)])),
dim='column')
dyn_fid.close()
if ckd_val is None:
return None
# Use NaN as FillValue
ckd_val = ckd_val.where(ckd_val != float.fromhex('0x1.ep+122'),
other=np.nan)
# combine DataArrays to Dataset
return xr.Dataset({'value': ckd_val}, attrs=ckd_val.attrs)
def __rd_datapoints(self, dset_name: str, bands: str) -> xr.Dataset | None:
"""General function to read datapoint dataset into xarray::Dataset.
Parameters
----------
dset_name: str
name (including path) of the dataset as '/BAND{}/<name>'
bands : str
Tropomi bands [1..8] or channels ['12', '34', '56', '78'],
default: '78'
Returns
-------
xarray.Dataset
parameters (value and uncertainty) of CKD with name 'dset_name'
"""
ckd_val = None
ckd_err = None
for band in bands:
# try Static-CKD product
if dset_name.format(band) in self.fid:
if ckd_val is None:
ckd_val = h5_to_xr(self.fid[dset_name.format(band)],
field='value')
ckd_err = h5_to_xr(self.fid[dset_name.format(band)],
field='error')
else:
ckd_val = xr.concat(
(ckd_val, h5_to_xr(self.fid[dset_name.format(band)],
field='value')), dim='column')
ckd_err = xr.concat(
(ckd_err, h5_to_xr(self.fid[dset_name.format(band)],
field='error')), dim='column')
# try Dynamic-CKD product
else:
dyn_fid = h5py.File(self.ckd_dyn_file, 'r')
if dset_name.format(band) in dyn_fid:
if ckd_val is None:
ckd_val = h5_to_xr(dyn_fid[dset_name.format(band)],
field='value')
ckd_err = h5_to_xr(dyn_fid[dset_name.format(band)],
field='error')
else:
ckd_val = xr.concat(
(ckd_val, h5_to_xr(dyn_fid[dset_name.format(band)],
field='value')), dim='column')
ckd_err = xr.concat(
(ckd_err, h5_to_xr(dyn_fid[dset_name.format(band)],
field='error')), dim='column')
dyn_fid.close()
if ckd_val is None:
return None
# Use NaN as FillValue
ckd_val = ckd_val.where(ckd_val != float.fromhex('0x1.ep+122'),
other=np.nan)
ckd_err = ckd_err.where(ckd_err != float.fromhex('0x1.ep+122'),
other=np.nan)
# combine DataArrays to Dataset
return xr.Dataset({'value': ckd_val, 'error': ckd_err},
attrs=ckd_val.attrs)
# ---------- static CKD's ----------
[docs]
def absirr(self, qvd: int = 1, bands: str = '78') -> xr.Dataset:
"""Return absolute irradiance responsivity.
Parameters
----------
qvd : int, default: 1
Tropomi QVD identifier. Valid values are 1 or 2
bands : str, default: '78'
Tropomi bands [1..8] or channels ['12', '34', '56', '78']
"""
try:
channel = self.__get_spectral_channel(bands)
except Exception as exc:
raise RuntimeError(exc) from exc
dset_name = '/BAND{}' + f'/abs_irr_conv_factor_qvd{qvd}'
ckd = self.__rd_datapoints(dset_name, bands)
if '7' in bands or '8' in bands:
ckd = reject_row257(ckd)
ckd.attrs['long_name'] = \
f'{channel} absolute irradiance CKD (QVD={qvd})'
return ckd.assign_coords(column=np.arange(ckd.column.size, dtype='u4'))
[docs]
def absrad(self, bands: str = '78') -> xr.Dataset:
"""Return absolute radiance responsivity.
Parameters
----------
bands : str, default: '78'
Tropomi bands [1..8] or channels ['12', '34', '56', '78']
"""
try:
channel = self.__get_spectral_channel(bands)
except Exception as exc:
raise RuntimeError(exc) from exc
dset_name = '/BAND{}/abs_rad_conv_factor'
ckd = self.__rd_datapoints(dset_name, bands)
if '7' in bands or '8' in bands:
ckd = reject_row257(ckd)
ckd.attrs['long_name'] = f'{channel} absolute radiance CKD'
return ckd.assign_coords(column=np.arange(ckd.column.size, dtype='u4'))
[docs]
def memory(self) -> xr.Dataset:
"""Return memory CKD, SWIR only."""
column = None
ckd_parms = ['mem_lin_neg_swir', 'mem_lin_pos_swir',
'mem_qua_neg_swir', 'mem_qua_pos_swir']
ckd = xr.Dataset()
ckd.attrs['long_name'] = 'SWIR memory CKD'
for key in ckd_parms:
dset_name = f'/BAND7/{key}'
ckd_val = h5_to_xr(self.fid[dset_name], field='value')
ckd_err = h5_to_xr(self.fid[dset_name], field='error')
dset_name = f'/BAND8/{key}'
ckd_val = xr.concat(
(ckd_val, h5_to_xr(self.fid[dset_name], field='value')),
dim='column')
if column is None:
column = np.arange(ckd_val.column.size, dtype='u4')
ckd_val = ckd_val.assign_coords(column=column)
ckd_err = xr.concat(
(ckd_err, h5_to_xr(self.fid[dset_name], field='error')),
dim='column')
ckd_err = ckd_err.assign_coords(column=column)
ckd[key.replace('swir', 'value')] = reject_row257(ckd_val)
ckd[key.replace('swir', 'error')] = reject_row257(ckd_err)
return ckd
[docs]
def noise(self, bands: str = '78') -> xr.Dataset:
"""Return readout-noise CKD, SWIR only.
Parameters
----------
bands : str, default: '78'
Tropomi bands [1..8] or channels ['12', '34', '56', '78']
"""
dset_name = '/BAND{}/readout_noise_swir'
ckd = reject_row257(self.__rd_dataset(dset_name, bands))
ckd.attrs['long_name'] = 'SWIR readout-noise CKD'
return ckd.assign_coords(column=np.arange(ckd.column.size, dtype='u4'))
[docs]
def prnu(self, bands: str = '78') -> xr.Dataset:
"""Return Pixel Response Non-Uniformity (PRNU).
Parameters
----------
bands : str, default: '78'
Tropomi bands [1..8] or channels ['12', '34', '56', '78']
"""
try:
channel = self.__get_spectral_channel(bands)
except Exception as exc:
raise RuntimeError(exc) from exc
ckd = self.__rd_datapoints('/BAND{}/PRNU', bands)
if '7' in bands or '8' in bands:
ckd = reject_row257(ckd)
ckd.attrs['long_name'] = f'{channel} PRNU CKD'
return ckd.assign_coords(column=np.arange(ckd.column.size, dtype='u4'))
[docs]
def relirr(self, qvd: int = 1, bands: str = '78') -> tuple[dict] | None:
"""Return relative irradiance correction.
Parameters
----------
bands : str, default: '78'
Tropomi bands [1..8] or channels ['12', '34', '56', '78']
qvd : int
Tropomi QVD identifier. Valid values are 1 or 2, default: 1
Returns
-------
dict
CKD for relative irradiance correction as dictionaries with keys:
- band: Tropomi spectral band ID
- mapping_cols: coarse irregular mapping of the columns
- mapping_rows: coarse irregular mapping of the rows
- cheb_coefs: Chebyshev parameters for elevation and azimuth \
for pixels on a coarse irregular grid
"""
try:
_ = self.__get_spectral_channel(bands)
except Exception as exc:
raise RuntimeError(exc) from exc
res = ()
for band in bands:
ckd = {'band': int(band)}
dsname = f'/BAND{band}/rel_irr_coarse_mapping_vert'
ckd['mapping_rows'] = self.fid[dsname][:].astype(int)
dsname = f'/BAND{band}/rel_irr_coarse_mapping_hor'
# pylint: disable=no-member
mapping_hor = self.fid[dsname][:].astype(int)
mapping_hor[mapping_hor > 1000] -= 2**16
ckd['mapping_cols'] = mapping_hor
dsname = f'/BAND{band}/rel_irr_coarse_func_cheb_qvd{qvd}'
ckd['cheb_coefs'] = self.fid[dsname]['coefs'][:]
res += (ckd,)
return res if res else None
[docs]
def saa(self) -> dict:
"""Return definition of the SAA region."""
return {
'lat': self.fid['saa_latitude'][:],
'lon': self.fid['saa_longitude'][:]}
[docs]
def wavelength(self, bands: str = '78') -> xr.Dataset:
"""Return wavelength CKD.
Parameters
----------
bands : str, default: '78'
Tropomi bands [1..8] or channels ['12', '34', '56', '78']
Notes
-----
The wavelength CKD has no error attached to it.
"""
try:
channel = self.__get_spectral_channel(bands)
except Exception as exc:
raise RuntimeError(exc) from exc
dset_name = '/BAND{}/wavelength_map'
ckd = self.__rd_datapoints(dset_name, bands)
if '7' in bands or '8' in bands:
ckd = reject_row257(ckd)
ckd.attrs['long_name'] = f'{channel} wavelength CKD'
return ckd.assign_coords(column=np.arange(ckd.column.size, dtype='u4'))
# ---------- static or dynamic CKD's ----------
[docs]
def darkflux(self, bands: str = '78') -> xr.Dataset:
"""Return dark-flux CKD, SWIR only.
Parameters
----------
bands : str, default: '78'
Tropomi SWIR bands '7', '8' or both '78'
"""
dset_name = '/BAND{}/long_term_swir'
ckd = reject_row257(self.__rd_datapoints(dset_name, bands))
ckd.attrs['long_name'] = 'SWIR dark-flux CKD'
return ckd.assign_coords(column=np.arange(ckd.column.size, dtype='u4'))
[docs]
def offset(self, bands: str = '78') -> xr.Dataset:
"""Return offset CKD, SWIR only.
Parameters
----------
bands : str, default: '78'
Tropomi SWIR bands '7', '8' or both '78'
"""
dset_name = '/BAND{}/analog_offset_swir'
ckd = reject_row257(self.__rd_datapoints(dset_name, bands))
ckd.attrs['long_name'] = 'SWIR offset CKD'
return ckd.assign_coords(column=np.arange(ckd.column.size, dtype='u4'))
[docs]
def pixel_quality(self, bands: str = '78') -> xr.Dataset:
"""Return detector pixel-quality mask (float [0, 1]), SWIR only.
Parameters
----------
bands : str, default: '78'
Tropomi SWIR bands '7', '8' or both '78'
"""
dset_name = '/BAND{}/dpqf_map'
ckd = reject_row257(self.__rd_dataset(dset_name, bands))
ckd.attrs['long_name'] = 'SWIR pixel-quality CKD'
return ckd.assign_coords(column=np.arange(ckd.column.size, dtype='u4'))
[docs]
def dpqf(self, threshold: float = None, bands: str = '78') -> xr.Dataset:
"""Return detector pixel-quality flags (boolean), SWIR only.
Parameters
----------
threshold: float, optional
Value between [0..1], default is to read the threshold from CKD
bands : str, default='78'
Tropomi SWIR bands '7', '8', or both '78'
Returns
-------
numpy ndarray
"""
dpqf = None
if threshold is None:
threshold = self.fid['/BAND7/dpqf_threshold'][:]
# try Static-CKD product
if '/BAND7/dpqf_map' in self.fid:
if bands == '7':
dpqf = self.fid['/BAND7/dpqf_map'][:-1, :] < threshold
elif bands == '8':
dpqf = self.fid['/BAND8/dpqf_map'][:-1, :] < threshold
elif bands == '78':
dpqf_b7 = self.fid['/BAND7/dpqf_map'][:-1, :]
dpqf_b8 = self.fid['/BAND8/dpqf_map'][:-1, :]
dpqf = np.hstack((dpqf_b7, dpqf_b8)) < threshold
else:
# try Dynamic-CKD product
with h5py.File(self.ckd_dyn_file, 'r') as fid:
if bands == '7':
dpqf = fid['/BAND7/dpqf_map'][:-1, :] < threshold
elif bands == '8':
dpqf = fid['/BAND8/dpqf_map'][:-1, :] < threshold
elif bands == '78':
dpqf_b7 = fid['/BAND7/dpqf_map'][:-1, :]
dpqf_b8 = fid['/BAND8/dpqf_map'][:-1, :]
dpqf = np.hstack((dpqf_b7, dpqf_b8)) < threshold
return dpqf
[docs]
def saturation(self) -> xr.Dataset:
"""Return pixel-saturation values (pre-offset), SWIR only."""
dset_name = '/BAND{}/saturation_preoffset'
ckd_file = (self.ckd_dir / 'OCAL'
/ 'ckd.saturation_preoffset.detector4.nc')
with h5py.File(ckd_file, 'r') as fid:
ckd_val = xr.concat((h5_to_xr(fid[dset_name.format(7)]),
h5_to_xr(fid[dset_name.format(8)])),
dim='column')
ckd = xr.Dataset({'value': ckd_val}, attrs=ckd_val.attrs)
ckd = reject_row257(ckd)
ckd.attrs['long_name'] = 'SWIR pixel-saturation CKD (pre-offset)'
return ckd.assign_coords(column=np.arange(ckd.column.size, dtype='u4'))