#
# This file is part of pyS5p
#
# https://github.com/rmvanhees/pys5p.git
#
# Copyright (c) 2017-2022 SRON - Netherlands Institute for Space Research
# All Rights Reserved
#
# License: BSD-3-Clause
"""This module contains the class `LV2io` to access Tropomi level-2 products."""
from __future__ import annotations
__all__ = ['LV2io']
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
import h5py
import numpy as np
from moniplot.image_to_xarray import data_to_xr, h5_to_xr
from netCDF4 import Dataset
if TYPE_CHECKING:
from pathlib import Path
import xarray as xr
# - global parameters ------------------------------
# - local functions --------------------------------
# - class definition -------------------------------
[docs]
class LV2io:
"""This class should offer all the necessary functionality to read Tropomi
S5P_OFFL_L2 products.
Parameters
----------
lv2_product : Path
full path to S5P Tropomi level 2 product
Notes
-----
The Python h5py module can read the operational netCDF4 products without
any problems, however, the SRON science products contain incompatible
attributes. Thus, should be fixed when more up-to-date netCDF software is
used to generate the products. Currently, the Python netCDF4 module is
used to read the science products.
"""
def __init__(self, lv2_product: Path):
"""Initialize access to an S5P_L2 product."""
if not lv2_product.is_file():
raise FileNotFoundError(f'{lv2_product.name} does not exist')
# initialize class-attributes
self.filename = lv2_product
# open LV2 product as HDF5 file
if self.science_product:
self.fid = Dataset(lv2_product, 'r')
self.ground_pixel = self.fid['/instrument/ground_pixel'][:].max()
self.ground_pixel += 1
self.scanline = self.fid['/instrument/scanline'][:].max()
self.scanline += 1
# alternative set flag sparse
if self.fid['/instrument/scanline'].size % self.ground_pixel != 0:
raise ValueError('not all scanlines are complete')
else:
self.fid = h5py.File(lv2_product, 'r')
self.ground_pixel = self.fid['/PRODUCT/ground_pixel'].size
self.scanline = self.fid['/PRODUCT/scanline'].size
def __iter__(self):
"""Allow itertion."""
for attr in sorted(self.__dict__):
if not attr.startswith('__'):
yield attr
def __enter__(self):
"""Initiate the context manager."""
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Exit the context manager."""
self.close()
return False # any exception is raised by the with statement.
[docs]
def close(self):
"""Close the product."""
if self.fid is not None:
self.fid.close()
# ----- Class properties --------------------
@property
def science_product(self) -> bool:
"""Check if product is a science product."""
science_inst = b'SRON Netherlands Institute for Space Research'
res = False
with h5py.File(self.filename) as fid:
if 'institution' in fid.attrs \
and fid.attrs['institution'] == science_inst:
res = True
return res
@property
def orbit(self) -> int:
"""Return reference orbit number."""
if self.science_product:
return int(self.__nc_attr('orbit', 'l1b_file'))
return int(self.__h5_attr('orbit', None)[0])
@property
def algorithm_version(self) -> str | None:
"""Return version of the level 2 algorithm."""
res = self.get_attr('algorithm_version')
return res if res is not None else self.get_attr('version')
@property
def processor_version(self) -> str | None:
"""Return version of the level 2 processor."""
res = self.get_attr('processor_version')
return res if res is not None else self.get_attr('version')
@property
def product_version(self) -> str:
"""Return version of the level 2 product."""
res = self.get_attr('product_version')
return res if res is not None else self.get_attr('version')
@property
def coverage_time(self) -> tuple:
"""Return start and end of the measurement coverage time."""
return (self.get_attr('time_coverage_start'),
self.get_attr('time_coverage_end'))
@property
def creation_time(self) -> str:
"""Return creation date/time of the level 2 product."""
return self.get_attr('date_created')
# ----- Attributes --------------------
def __h5_attr(self, attr_name: str,
ds_name: str | None) -> np.ndarray | None:
"""Read attributes from operational products using hdf5."""
if ds_name is not None:
dset = self.fid[f'/PRODUCT/{ds_name}']
if attr_name not in dset.attrs:
return None
attr = dset.attrs[attr_name]
else:
if attr_name not in self.fid.attrs:
return None
attr = self.fid.attrs[attr_name]
if isinstance(attr, bytes):
return attr.decode('ascii')
return attr
def __nc_attr(self, attr_name: str, ds_name: str) -> np.ndarray | None:
"""Read attributes from science products using netCDF4."""
if ds_name is not None:
for grp_name in ['target_product', 'side_product', 'instrument']:
if grp_name not in self.fid.groups:
continue
if ds_name not in self.fid[grp_name].variables:
continue
dset = self.fid[f'/{grp_name}/{ds_name}']
if attr_name in dset.ncattrs():
return dset.getncattr(attr_name)
return None
if attr_name not in self.fid.ncattrs():
return None
return self.fid.getncattr(attr_name)
[docs]
def get_attr(self, attr_name: str,
ds_name: str | None = None) -> np.ndarray | None:
"""Obtain value of an HDF5 file attribute or dataset attribute.
Parameters
----------
attr_name : str
name of the attribute
ds_name : str, optional
name of dataset, default is to read the product attributes
"""
if self.science_product:
return self.__nc_attr(attr_name, ds_name)
return self.__h5_attr(attr_name, ds_name)
# ----- Time information ---------------
@property
def ref_time(self) -> datetime | None:
"""Return reference start time of measurements."""
if self.science_product:
return None
return (datetime(2010, 1, 1, 0, 0, 0)
+ timedelta(seconds=int(self.fid['/PRODUCT/time'][0])))
[docs]
def get_time(self) -> np.ndarray | None:
"""Return start time of measurement per scan-line."""
if self.science_product:
buff = self.get_dataset('time')[::self.ground_pixel, :]
return np.array([datetime(*x) for x in buff])
buff = self.fid['/PRODUCT/delta_time'][0, :]
return np.array([self.ref_time + timedelta(seconds=x / 1e3)
for x in buff])
# ----- Geolocation --------------------
def __h5_geo_data(self, geo_dsets: str) -> dict:
"""Read geolocation datasets from operational products using HDF5."""
res = {}
if geo_dsets is None:
geo_dsets = 'latitude,longitude'
for key in geo_dsets.split(','):
for grp_name in ['/PRODUCT', '/PRODUCT/SUPPORT_DATA/GEOLOCATIONS']:
if key in self.fid[grp_name]:
res[key] = np.squeeze(
self.fid[f'{grp_name}/{key}'])
continue
return res
def __nc_geo_data(self, geo_dsets: str) -> dict:
"""Read geolocation datasets from science products using netCDF4."""
res = {}
if geo_dsets is None:
geo_dsets = 'latitude_center,longitude_center'
for key in geo_dsets.split(','):
if key in self.fid['/instrument'].variables:
ds_name = f'/instrument/{key}'
res[key] = self.fid[ds_name][:].reshape(
self.scanline, self.ground_pixel)
return res
[docs]
def get_geo_data(self, geo_dsets: str | None = None) -> dict:
"""Return data of selected datasets from the GEOLOCATIONS group.
Parameters
----------
geo_dsets : str, optional
Name(s) of datasets, comma separated. Default:
* operational: 'latitude,longitude'
* science: 'latitude_center,longitude_center'
Returns
-------
dict
dictionary with arrays of selected datasets
"""
if self.science_product:
return self.__nc_geo_data(geo_dsets)
return self.__h5_geo_data(geo_dsets)
# ----- Footprints --------------------
def __h5_geo_bounds(self, extent: list[float, float, float, float],
data_sel: tuple[slice | int]) -> tuple:
"""Read bounds of latitude/longitude from operational products
using HDF5.
"""
if extent is not None:
if len(extent) != 4:
raise ValueError('parameter extent must have 4 elements')
lats = self.fid['/PRODUCT/latitude'][0, ...]
lons = self.fid['/PRODUCT/longitude'][0, ...]
indx = ((lons >= extent[0]) & (lons <= extent[1])
& (lats >= extent[2]) & (lats <= extent[3])).nonzero()
data_sel = np.s_[indx[0].min():indx[0].max(),
indx[1].min():indx[1].max()]
gid = self.fid['/PRODUCT/SUPPORT_DATA/GEOLOCATIONS']
if data_sel is None:
lat_bounds = gid['latitude_bounds'][0, ...]
lon_bounds = gid['longitude_bounds'][0, ...]
else:
data_sel0 = (0,) + data_sel + (slice(None),)
lat_bounds = gid['latitude_bounds'][data_sel0]
lon_bounds = gid['longitude_bounds'][data_sel0]
return data_sel, lon_bounds, lat_bounds
def __nc_geo_bounds(self, extent: list[float, float, float, float],
data_sel: tuple[slice | int]) -> tuple:
"""Read bounds of latitude/longitude from science products
using netCDF4.
"""
if extent is not None:
if len(extent) != 4:
raise ValueError('parameter extent must have 4 elements')
lats = self.fid['/instrument/latitude_center'][:].reshape(
self.scanline, self.ground_pixel)
lons = self.fid['/instrument/longitude_center'][:].reshape(
self.scanline, self.ground_pixel)
indx = ((lons >= extent[0]) & (lons <= extent[1])
& (lats >= extent[2]) & (lats <= extent[3])).nonzero()
data_sel = np.s_[indx[0].min():indx[0].max(),
indx[1].min():indx[1].max()]
gid = self.fid['/instrument']
lat_bounds = gid['latitude_corners'][:].data.reshape(
self.scanline, self.ground_pixel, 4)
lon_bounds = gid['longitude_corners'][:].data.reshape(
self.scanline, self.ground_pixel, 4)
if data_sel is not None:
lat_bounds = lat_bounds[data_sel + (slice(None),)]
lon_bounds = lon_bounds[data_sel + (slice(None),)]
return data_sel, lon_bounds, lat_bounds
[docs]
def get_geo_bounds(self,
extent: list[float, float, float, float] | None,
data_sel: tuple[slice | int] | None) -> np.ndarray | tuple:
"""Return bounds of latitude/longitude as a mesh for plotting.
Parameters
----------
extent : list
select data to cover a region with geolocation defined by:
lon_min, lon_max, lat_min, lat_max and return numpy slice
data_sel : numpy slice
a 3-dimensional numpy slice: time, scan_line, ground_pixel
Note 'data_sel' will be overwritten when 'extent' is defined
Returns
-------
data_sel : numpy slice
Select slice of data which covers geolocation defined by extent.
Only provided if extent is not None.
out : dictionary
With numpy arrays for latitude and longitude
"""
if self.science_product:
res = self.__nc_geo_bounds(extent, data_sel)
else:
res = self.__h5_geo_bounds(extent, data_sel)
data_sel, lon_bounds, lat_bounds = res
res = {}
_sz = lon_bounds.shape
res['longitude'] = np.empty((_sz[0]+1, _sz[1]+1), dtype=float)
res['longitude'][:-1, :-1] = lon_bounds[:, :, 0]
res['longitude'][-1, :-1] = lon_bounds[-1, :, 1]
res['longitude'][:-1, -1] = lon_bounds[:, -1, 1]
res['longitude'][-1, -1] = lon_bounds[-1, -1, 2]
res['latitude'] = np.empty((_sz[0]+1, _sz[1]+1), dtype=float)
res['latitude'][:-1, :-1] = lat_bounds[:, :, 0]
res['latitude'][-1, :-1] = lat_bounds[-1, :, 1]
res['latitude'][:-1, -1] = lat_bounds[:, -1, 1]
res['latitude'][-1, -1] = lat_bounds[-1, -1, 2]
if extent is None:
return res
return data_sel, res
# ----- Datasets (numpy) --------------------
def __h5_dataset(self, name: str, data_sel: tuple[slice | int],
fill_as_nan: bool) -> np.ndarray:
"""Read dataset from operational products using HDF5."""
fillvalue = float.fromhex('0x1.ep+122')
if name not in self.fid['/PRODUCT']:
raise ValueError(f'dataset {name} for found')
dset = self.fid[f'/PRODUCT/{name}']
if data_sel is None:
if dset.dtype == np.float32:
res = dset.astype(float)[0, ...]
else:
res = dset[0, ...]
else:
if dset.dtype == np.float32:
res = dset.astype(float)[(0,) + data_sel]
else:
res = dset[(0,) + data_sel]
if fill_as_nan and dset.attrs['_FillValue'] == fillvalue:
res[(res == fillvalue)] = np.nan
return res
def __nc_dataset(self, name: str, data_sel: tuple[slice | int],
fill_as_nan: bool) -> np.ndarray:
"""Read dataset from science products using netCDF4."""
if name in self.fid['/target_product'].variables:
group = '/target_product'
elif name in self.fid['/instrument'].variables:
group = '/instrument'
else:
raise ValueError(f'dataset {name} for found')
dset = self.fid[f'{group}/{name}']
if dset.size == self.scanline * self.ground_pixel:
res = dset[:].reshape(self.scanline, self.ground_pixel)
else:
res = dset[:]
if data_sel is not None:
res = res[data_sel]
if fill_as_nan:
return res.filled(np.nan)
return res.data
[docs]
def get_dataset(self, name: str,
data_sel: tuple[slice | int] | slice | None = None,
fill_as_nan: bool = True) -> np.ndarray:
"""Read level 2 dataset from PRODUCT group.
Parameters
----------
name : string
name of dataset with level 2 data
data_sel : numpy slice
a 3-dimensional numpy slice: time, scan_line, ground_pixel
fill_as_nan : boolean
Replace (float) FillValues with Nan's, when True
Returns
-------
numpy.ndarray
"""
if self.science_product:
return self.__nc_dataset(name, data_sel, fill_as_nan)
return self.__h5_dataset(name, data_sel, fill_as_nan)
# ----- Dataset (xarray) --------------------
def __h5_data_as_xds(self, name: str,
data_sel: tuple[slice | int]) -> xr.DataArray:
"""Read dataset from group target_product using HDF5.
Input: operational product
Return: xarray.Dataset
"""
if name not in self.fid['/PRODUCT']:
raise ValueError(f'dataset {name} for found')
dset = self.fid[f'/PRODUCT/{name}']
# ToDo handle parameter mol_m2
return h5_to_xr(dset, (0,) + data_sel).squeeze()
def __nc_data_as_xds(self, name: str,
data_sel: tuple[slice | int]) -> xr.DataArray:
"""Read dataset from group PRODUCT using netCDF4.
Input: science product
Return: xarray.DataArray
"""
if name in self.fid['/target_product'].variables:
group = '/target_product/'
elif name in self.fid['/instrument'].variables:
group = '/instrument/'
else:
raise ValueError('dataset {name} for found')
return data_to_xr(self.get_dataset(group + name, data_sel),
dims=['scanline', 'ground_pixel'], name=name,
long_name=self.get_attr('long_name', name),
units=self.get_attr('units', name))
[docs]
def get_data_as_xds(self, name: str,
data_sel: tuple[slice | int] | None = None) \
-> xr.DataArray:
"""Read dataset from group PRODUCT/target_product group.
Parameters
----------
name : str
name of dataset with level 2 data
data_sel : numpy slice
a 3-dimensional numpy slice: time, scan_line, ground_pixel
Returns
-------
xarray.DataArray
"""
if self.science_product:
return self.__nc_data_as_xds(name, data_sel)
return self.__h5_data_as_xds(name, data_sel)