Source code for spectrochempy.processing.filter.denoise

import numpy as np
from scipy.signal import savgol_filter

from spectrochempy.application import error_
from spectrochempy.application import info_
from spectrochempy.application import warning_
from spectrochempy.core import get_loglevel

__dataset_methods__ = ["denoise", "despike"]
__all__ = __dataset_methods__


[docs] def denoise(dataset, ratio=99.8, **kwargs): r""" Denoise the data using a PCA method. Work only on 2D dataset. Parameters ---------- dataset : `NDDataset` or a ndarray-like object Input object. Must have two dimensions. ratio : `float`, optional, default: 99.8% Ratio of variance explained in \%. The number of components selected for reconstruction is chosen automatically such that the amount of variance that needs to be explained is greater than the percentage specified by `ratio` . **kwargs Optional keyword parameters (see Other Parameters). Returns ------- `NDDataset` Denoised 2D dataset Other Parameters ---------------- dim : str or int, optional, default='x'. Specify on which dimension to apply this method. If `dim` is specified as an integer it is equivalent to the usual `axis` numpy parameter. log_level : int, optional, default: "WARNING" Set the logging level for the method. """ from spectrochempy.analysis.decomposition.pca import PCA if dataset.ndim != 2 and dataset.shape[0] > 1: error_("Only 2D dataset are supported") return dataset if ratio > 100.0 or ratio < 0.0: error_("ratio must be between 0 and 100") return dataset ratio = ratio / 100.0 log_level = kwargs.pop("log_level", get_loglevel()) dim = kwargs.pop("dim", -1) axis, _ = dataset.get_axis(dim, negative_axis=True) swapped = False if axis != -1: dataset = dataset.swapdims(axis, -1) swapped = True pca = PCA(n_components=ratio, svd_solver="full", log_level=log_level) pca.fit(dataset) info_( f"Number of components selected for reconstruction: {pca.n_components} " f"[n_observations={dataset.shape[0]}, ratio={ratio * 10: .2f}%]", ) if pca.n_components < 3: warning_( f"The number of components ({pca.n_components}) selected for " f"reconstruction seems very low.\nYour likely to have a poor " f"reconstruction.\nTry to increase the ratio.", ) data = pca.inverse_transform() if swapped: data = data.swapdims(-1, axis) return data
[docs] def despike(dataset, size=9, delta=2, method="katsumoto"): """ Remove spikes from the data. The `despike` methods can be used to remove cosmic ray peaks from a spectrum. The 'katsumoto' implementation (default) is based on the method is described in :cite:t:`katsumoto:2003`: * In the first step, the moving-average method is employed to detect the spike noise. The moving-average window should contain several data points along the abscissa that are larger than those of the spikes in the spectrum. If a window contains a spike, the value on the ordinate for the spike will show an anomalous deviation from the average for this window. * In the second step, each data point value identified as noise is replaced by the moving-averaged value. * In the third step, the moving-average process is applied to the new data set made by the second step. * In the fourth step, the spikes are identified by comparing the differences between the original spectra and the moving-averaged spectra calculated in the third step. As a result, the proposed method realizes the reduction of convex spikes. The 'whitaker' implementation is based on the method is described in :cite:t:`whitaker:2018`: * The spikes are detected when the zscore of the difference between consecutive intensities is larger than the delta parameter. * The spike intensities are replaced by the average of the intensities in a window around the spike, excluding the points that are spikes. Parameters ---------- dataset : `NDDataset` or a ndarray-like object Input object. size : int, optional, default: 9 Size of the moving average window ('katsumoto' method) or the size of the window around the spike to estimate the intensities ('whitaker' method). delta : float, optional, default: 2 Set the threshold for the detection of spikes. method : str, optional, default: 'katsumoto' The method to use. Can be 'katsumoto' or 'whitaker' Returns ------- `NDdataset` The despike dataset """ new = dataset.copy() # machine epsilon eps = np.finfo(float).eps s = int((size - 1) / 2) for k, X in enumerate(new.data): X = X.squeeze() if method == "katsumoto": # 1) first step : savgol filter A = savgol_filter(X, window_length=size, polyorder=2) # 2 and 3) second and third step : detect spike and replace spkike by the moving # average and the new data are smoothed again diff = X - A std = delta * np.std(diff) # spike should have a large variation with respect to the std of the difference select = abs(diff) >= std select = np.logical_or(select, np.roll(select, 1)) select = np.logical_or(select, np.roll(select, -1)) # compute weights w = np.ones_like(X) w[select] = 0 # now we must calculate the weighted average, but for efficiency we will compute it # only around where spike peaks are, the other part should be unchanged. res = np.zeros_like(X) W = np.zeros_like(X) + eps # to avoid division by zero r = range(-s, s + 1) for j in r: vj = np.roll(X, j) wj = np.roll(w, j) res += vj * wj W += wj A = res / W # 4) compare with original to remove spike peaks X[select] -= (X - A)[select] elif method == "whitaker": # 1) detrended difference series DX = np.zeros_like(X) DX[1:] = X[1:] - X[:-1] # zscore m = np.median(DX) M = np.median(np.abs(DX - m)) Z = (DX - m) / M Z[0] = Z[-1] = delta + 1 # select spikes select = abs(Z) >= delta # replace spikes by average A = np.zeros_like(X) for i in [j for j, x in enumerate(select) if x]: indexes = np.arange(max(0, i - s), min(len(X), i + s)) indexes = [index for index in indexes if not select[index]] if indexes != []: A[i] = np.mean(X[indexes]) else: A[i] = X[i] # makes change in X X[select] -= (X - A)[select] new.data[k] = X new.history = f"despiked with method={method}, size={size}, delta={delta}" return new