Source code for pys5p.rls

#
# This file is part of pyS5p
#
# https://github.com/rmvanhees/pys5p.git
#
# Copyright (c) 2017-2022 SRON - Netherlands Institute for Space Research
#   All Rights Reserved
#
# License:  BSD-3-Clause
"""
Implementation of the Relative Least-Squares regression (RLS).

The RLS regression is used to find the linear dependence y(x) = c0 + c1 * x
that best describes the data before and after correction, using absolute
residuals y_i - (c0 + c1 * x_i) divided by the expected signals c1 * x_i in
the least-squares sum. Offset c0 has an arbitrary size and should not affect
the fit result. Weight factors are determined such to effectively spread the
data points evenly over the whole range of x, making the result less
sensitive to the actual spacing between the data points.
"""
from __future__ import annotations

__all__ = ['rls_fit', 'rls_fit0']

import numpy as np
from numpy import ma


def calc_ma_weights(xdata: np.ndarray, masked: np.ndarray) -> ma.MaskedArray:
    """
    Generate weight factor per pixel.

    Notes
    -----
    It might be that np.apply_along_axis() is slightly faster, however, the
    for-loop 'row in buff' is also very efficient when using the specially
    designed MaskedArray 'buff' which we eventually use to store the weight
    values per pixel.
    """
    buff = ma.array(np.repeat([xdata], masked.shape[0], axis=0), mask=masked)
    for row in buff:
        valid = ma.compressed(row)
        if len(valid) < 2:
            continue
        wght = np.concatenate(([2 * (valid[1] - valid[0])],
                               valid[2:] - valid[0:-2],
                               [2 * (valid[-1] - valid[-2])]))
        row[~row.mask] = wght

    return buff


[docs] def rls_fit(xdata: np.ndarray, ydata: np.ndarray | ma.MaskedArray) -> tuple: """ Perform RLS regression finding linear dependence y(x) = c0 + c1 * x. Parameters ---------- xdata : ndarray, shape (M,) X-coordinates of the M sample points (xdata[i], ydata[..., i]) The array values have to be positive and increasing ydata : MaskedArray or ndarray, shape (..., M) Y-coordinates of the sample points Returns ------- c0, c1, std_c0, std_c1 : tuple of ndarrays coefficients of the linear dependence and their standard deviations Notes ----- Calling a rls-function with MaskedArrays is much slower than with plain ndarrays. The coefficients are set to NaN when the number of samples are less than 2. The standard deviations can only be calculated when the number of samples are larger than two, else the standard deviations are equal to zero. """ # pylint: disable=too-many-locals if xdata.size < 2: raise RuntimeError('too few sample points for a fit') if xdata.size != ydata.shape[-1]: raise RuntimeError('number of samples not equal for xdata, ydata') # perform all computations on 2 dimensional arrays img_shape = ydata.shape[:-1] yy1 = ydata.reshape(-1, xdata.size) # calculate weights if ma.isMaskedArray(ydata): wghts = calc_ma_weights(xdata, ma.getmaskarray(yy1)) else: buff = np.concatenate(([2 * (xdata[1] - xdata[0])], xdata[2:] - xdata[0:-2], [2 * (xdata[-1] - xdata[-2])])) wghts = np.repeat([buff], yy1.shape[0], axis=0) wx1 = wghts / xdata wx2 = wghts / xdata ** 2 # is wx1 / xdata faster? # calculate the Q elements q00 = wghts.sum(axis=1) q01 = wx1.sum(axis=1) q02 = wx2.sum(axis=1) q11 = (wx1 * yy1).sum(axis=1) q12 = (wx2 * yy1).sum(axis=1) q22 = (wx2 * yy1 ** 2).sum(axis=1) # calculate the Z elements zz1 = q00 * q02 - q01 ** 2 zz2 = q00 * q12 - q01 * q11 zz3 = q02 * q11 - q01 * q12 # calculate fit parameters and their uncertainties num = yy1.count(axis=1) if ma.isMaskedArray(ydata) else len(xdata) cc0 = zz2 / zz1 cc1 = zz3 / zz1 if ma.isMaskedArray(ydata): chi2 = ma.abs(q22 - q12 * cc0 - q11 * cc1) / np.clip(num - 2, 1, None) chi2[num <= 2] = 0 sc0 = ma.sqrt(q00 * chi2 / zz1) sc1 = ma.sqrt(q02 * chi2 / zz1) return (cc0.reshape(img_shape).filled(np.nan), cc1.reshape(img_shape).filled(np.nan), sc0.reshape(img_shape).filled(np.nan), sc1.reshape(img_shape).filled(np.nan)) # using only non-MaskedArray functions chi2 = np.abs(q22 - q12 * cc0 - q11 * cc1) / np.clip(num - 2, 1, None) chi2[num <= 2] = 0 sc0 = np.sqrt(q00 * chi2 / zz1) sc1 = np.sqrt(q02 * chi2 / zz1) return (cc0.reshape(img_shape), cc1.reshape(img_shape), sc0.reshape(img_shape), sc1.reshape(img_shape))
[docs] def rls_fit0(xdata: np.ndarray, ydata: np.ndarray | ma.MaskedArray) -> tuple: """ Perform RLS regression finding linear dependence y(x) = c1 * x. Parameters ---------- xdata : ndarray, shape (M,) X-coordinates of the M sample points (xdata[i], ydata[..., i]) The array values have to be positive and increasing ydata : MaskedArray or ndarray, shape (..., M) Y-coordinates of the sample points Returns ------- c1, std_c1 : tuple of ndarrays coefficients of the linear dependence and their standard deviations Notes ----- The coefficients are set to NaN when the number of samples are less than 2. The standard deviations can only be calculated when the number of samples are larger than two, else the standard deviations are equal to zero. """ if xdata.size < 2: raise RuntimeError('too few points for a fit') if xdata.size != ydata.shape[-1]: raise RuntimeError('number of samples not equal for xdata, ydata') # perform all computations on 2 dimensional arrays img_shape = ydata.shape[:-1] yy1 = ydata.reshape(-1, xdata.size) # calculate weights if ma.isMaskedArray(ydata): wghts = calc_ma_weights(xdata, ma.getmaskarray(yy1)) else: buff = np.concatenate(([2 * (xdata[1] - xdata[0])], xdata[2:] - xdata[0:-2], [2 * (xdata[-1] - xdata[-2])])) wghts = np.repeat([buff], yy1.shape[0], axis=0) wx1 = wghts / xdata wx2 = wghts / xdata ** 2 # calculate the Q elements q00 = wghts.sum(axis=1) q11 = (wx1 * yy1).sum(axis=1) q22 = (wx2 * yy1 ** 2).sum(axis=1) # calculate fit parameter and its variance num = yy1.count(axis=1) if ma.isMaskedArray(ydata) else len(xdata) cc1 = q11 / q00 if ma.isMaskedArray(ydata): cc1[num < 1] = ma.masked chi2 = ma.abs(q22 - q00 * cc1 ** 2) / np.clip(num - 1, 1, None) chi2[num <= 1] = ma.masked sc1 = ma.sqrt(chi2 / q00) return (cc1.reshape(img_shape).filled(np.nan), sc1.reshape(img_shape).filled(np.nan)) # using only non-MaskedArray functions cc1[num < 1] = np.nan chi2 = np.abs(q22 - q00 * cc1 ** 2) / np.clip(num - 1, 1, None) chi2[num <= 1] = np.nan sc1 = np.sqrt(chi2 / q00) return cc1.reshape(img_shape), sc1.reshape(img_shape)