Warning

You are reading the documentation related to the development version. Go here if you are looking for the documentation of the stable release.

Source code for spectrochempy.processing.filter.denoise

import numpy as np
from scipy.signal import savgol_filter

from spectrochempy.application import error_
from spectrochempy.application import info_
from spectrochempy.application import warning_
from spectrochempy.core import get_loglevel

__dataset_methods__ = ["denoise", "despike"]
__all__ = __dataset_methods__


[docs] def denoise(dataset, ratio=99.8, **kwargs): r""" Denoise the data using a PCA method. Work only on 2D dataset. Parameters ---------- dataset : `NDDataset` or a ndarray-like object Input object. Must have two dimensions. ratio : `float`, optional, default: 99.8% Ratio of variance explained in \%. The number of components selected for reconstruction is chosen automatically such that the amount of variance that needs to be explained is greater than the percentage specified by `ratio` . **kwargs Optional keyword parameters (see Other Parameters). Returns ------- `NDDataset` Denoised 2D dataset Other Parameters ---------------- dim : str or int, optional, default='x'. Specify on which dimension to apply this method. If `dim` is specified as an integer it is equivalent to the usual `axis` numpy parameter. log_level : int, optional, default: "WARNING" Set the logging level for the method. """ from spectrochempy.analysis.decomposition.pca import PCA if dataset.ndim != 2 and dataset.shape[0] > 1: error_("Only 2D dataset are supported") return dataset if ratio > 100.0 or ratio < 0.0: error_("ratio must be between 0 and 100") return dataset ratio = ratio / 100.0 log_level = kwargs.pop("log_level", get_loglevel()) dim = kwargs.pop("dim", -1) axis, _ = dataset.get_axis(dim, negative_axis=True) swapped = False if axis != -1: dataset = dataset.swapdims(axis, -1) swapped = True pca = PCA(n_components=ratio, svd_solver="full", log_level=log_level) pca.fit(dataset) info_( f"Number of components selected for reconstruction: {pca.n_components} " f"[n_observations={dataset.shape[0]}, ratio={ratio * 10: .2f}%]", ) if pca.n_components < 3: warning_( f"The number of components ({pca.n_components}) selected for " f"reconstruction seems very low.\nYour likely to have a poor " f"reconstruction.\nTry to increase the ratio.", ) data = pca.inverse_transform() if swapped: data = data.swapdims(-1, axis) return data
[docs] def despike(dataset, size=9, delta=2, method="katsumoto"): """ Remove spikes from the data. The `despike` methods can be used to remove cosmic ray peaks from a spectrum. The 'katsumoto' implementation (default) is based on the method is described in :cite:t:`katsumoto:2003`: * In the first step, the moving-average method is employed to detect the spike noise. The moving-average window should contain several data points along the abscissa that are larger than those of the spikes in the spectrum. If a window contains a spike, the value on the ordinate for the spike will show an anomalous deviation from the average for this window. * In the second step, each data point value identified as noise is replaced by the moving-averaged value. * In the third step, the moving-average process is applied to the new data set made by the second step. * In the fourth step, the spikes are identified by comparing the differences between the original spectra and the moving-averaged spectra calculated in the third step. As a result, the proposed method realizes the reduction of convex spikes. The 'whitaker' implementation is based on the method is described in :cite:t:`whitaker:2018`: * The spikes are detected when the zscore of the difference between consecutive intensities is larger than the delta parameter. * The spike intensities are replaced by the average of the intensities in a window around the spike, excluding the points that are spikes. Parameters ---------- dataset : `NDDataset` or a ndarray-like object Input object. size : int, optional, default: 9 Size of the moving average window ('katsumoto' method) or the size of the window around the spike to estimate the intensities ('whitaker' method). delta : float, optional, default: 2 Set the threshold for the detection of spikes. method : str, optional, default: 'katsumoto' The method to use. Can be 'katsumoto' or 'whitaker' Returns ------- `NDdataset` The despike dataset """ new = dataset.copy() # machine epsilon eps = np.finfo(float).eps s = int((size - 1) / 2) for k, X in enumerate(new.data): X = X.squeeze() if method == "katsumoto": # 1) first step : savgol filter A = savgol_filter(X, window_length=size, polyorder=2) # 2 and 3) second and third step : detect spike and replace spkike by the moving # average and the new data are smoothed again diff = X - A std = delta * np.std(diff) # spike should have a large variation with respect to the std of the difference select = abs(diff) >= std select = np.logical_or(select, np.roll(select, 1)) select = np.logical_or(select, np.roll(select, -1)) # compute weights w = np.ones_like(X) w[select] = 0 # now we must calculate the weighted average, but for efficiency we will compute it # only around where spike peaks are, the other part should be unchanged. res = np.zeros_like(X) W = np.zeros_like(X) + eps # to avoid division by zero r = range(-s, s + 1) for j in r: vj = np.roll(X, j) wj = np.roll(w, j) res += vj * wj W += wj A = res / W # 4) compare with original to remove spike peaks X[select] -= (X - A)[select] elif method == "whitaker": # 1) detrended difference series DX = np.zeros_like(X) DX[1:] = X[1:] - X[:-1] # zscore m = np.median(DX) M = np.median(np.abs(DX - m)) Z = (DX - m) / M Z[0] = Z[-1] = delta + 1 # select spikes select = abs(Z) >= delta # replace spikes by average A = np.zeros_like(X) for i in [j for j, x in enumerate(select) if x]: indexes = np.arange(max(0, i - s), min(len(X), i + s)) indexes = [index for index in indexes if not select[index]] if indexes != []: A[i] = np.mean(X[indexes]) else: A[i] = X[i] # makes change in X X[select] -= (X - A)[select] new.data[k] = X new.history = f"despiked with method={method}, size={size}, delta={delta}" return new