Source code for spectrochempy.analysis.decomposition.fast_ica

# ======================================================================================
# Copyright (©) 2015-2025 LCS - Laboratoire Catalyse et Spectrochimie, Caen, France.
# CeCILL-B FREE SOFTWARE LICENSE AGREEMENT
# See full LICENSE agreement in the root directory.
# ======================================================================================
"""Implementation of FastICA model (using scikit-learn library)."""

import traitlets as tr
from numpy.random import RandomState
from sklearn import decomposition

from spectrochempy.analysis._base._analysisbase import DecompositionAnalysis
from spectrochempy.analysis._base._analysisbase import _wrap_ndarray_output_to_nddataset
from spectrochempy.utils.decorators import signature_has_configurable_traits
from spectrochempy.utils.docreps import _docstring
from spectrochempy.utils.traits import NDDatasetType

__all__ = ["FastICA"]
__configurables__ = ["FastICA"]


# ======================================================================================
# class FastICA
# ======================================================================================
[docs] @signature_has_configurable_traits class FastICA(DecompositionAnalysis): _docstring.delete_params("DecompositionAnalysis.see_also", "FastICA") __doc__ = _docstring.dedent( r""" Fast algorithm for Independent Component Analysis (FastICA). A wrapper of `sklearn.decomposition.FastICA`. :term:`ICA` (Independent Component Analysis) extracts the underlying sources of the variability of a set of spectra :math:`X` into the spectral profiles :math:`S^t` of the underlying sources and a mixing matrix :math:`A`. In terms of matrix equation: .. math:: X = \bar{X} + A \cdot S^t + E where :math:`\bar{X}` is the mean of the dataset and :math:`E` is the matrix of residuals. Parameters ---------- %(AnalysisConfigurable.parameters)s See Also -------- %(DecompositionAnalysis.see_also.no_FastICA)s """, ) # ---------------------------------------------------------------------------------- # Runtime Parameters, # only those specific to FastICA, the other being defined in AnalysisConfigurable. # ---------------------------------------------------------------------------------- # define here only the variable that you use in fit or transform functions _fastica = tr.Instance( decomposition.FastICA, help="The instance of sklearn.decomposition.FastICA used in this model", ) # ---------------------------------------------------------------------------------- # Configuration parameters # ---------------------------------------------------------------------------------- n_components = tr.Integer( default_value=None, allow_none=True, help="Number of components (sources) to use.", ).tag(config=True) algorithm = tr.Enum( ["parallel", "deflation"], default_value="parallel", help=("""Specify which algorithm to use for FastICA."""), ).tag(config=True) whiten = tr.Union( (tr.Enum(["arbitrary-variance", "unit-variance"]), tr.Bool(False)), default_value="unit-variance", allow_none=True, help=( r"""Specify the whitening strategy to use. - ``"arbitrary-variance"``\ : a whitening with variance arbitrary is used. - "unit-variance" : the whitening matrix is rescaled to ensure that each recovered source has unit variance. - False : the data is already considered to be whitened, and no whitening is performed. """ ), ).tag(config=True) fun = tr.Union( (tr.Enum(["logcosh", "exp", "cube"]), tr.Callable(), tr.Unicode()), default_value="logcosh", allow_none=True, help=( r"""The functional form of the function used in the approximation to neg-entropy. - ``string``\ : could be either ``"logcosh"``, ``"exp"``, or ``"cube"``. - ``callable``\ : You can provide your own function. It should return a tuple containing the value of the function, and of its derivative, in the point. The derivative should be averaged along its last dimension. .. code-block:: def my_g(x): return x ** 3, (3 * x ** 2).mean(axis=-1) .. note:: ``fun`` can be also a serialized function created using dill and base64 python libraries. Normally not used directly, it is here for internal process. """ ), ).tag(config=True) fun_args = tr.Dict( default_value=None, allow_none=True, help=( """Arguments to send to the functional form. If empty or None and if ``fun=="logcosh"`` , `fun_args` will take value ``{alpha : 1.0}``.""" ), ).tag(config=True) tol = tr.Float( default_value=1e-4, help="Tolerance at which the un-mixing matrix is considered to have converged.", ).tag(config=True) max_iter = tr.Integer( default_value=200, help="Maximum number of iterations before timing out.", ).tag(config=True) w_init = tr.Union( (NDDatasetType(),), default_value=None, allow_none=True, help=( r"""Initial un-mixing array. NDDataset or array-like of shape (n_components, n_components). If w_init=None, then an array of values drawn from a normal distribution is used.""" ), ).tag(config=True) whiten_solver = tr.Enum( ["svd", "eigh"], default_value="svd", help=( r"""The solver to use for whitening. - ``"svd"``\ : is more stable numerically if the problem is degenerate, and often faster when :term:`n_observations` <= :term:`n_features`. - ``"eigh"``\ : is generally more memory efficient when :term:`n_observations` >= :term:`n_features`, and can be faster when :term:`n_observations` >= 50 * :term:`n_features`. """ ), ).tag(config=True) random_state = tr.Union( (tr.Integer(), tr.Instance(RandomState)), allow_none=True, default_value=None, help=( "Used to initialize ``w_init`` when not specified, with a normal" "distribution. Pass an ``int``, for reproducible results across " "multiple function calls." ), ).tag(config=True) # ---------------------------------------------------------------------------------- # Initialization # ---------------------------------------------------------------------------------- def __init__( self, *, log_level="WARNING", warm_start=False, **kwargs, ): # call the super class for initialisation of the configuration parameters # to do before anything else! super().__init__( log_level=log_level, warm_start=warm_start, **kwargs, ) # initialize sklearn Fast!ICA self._fast_ica = decomposition.FastICA( n_components=self.n_components, algorithm=self.algorithm, whiten=self.whiten, fun=self.fun, fun_args=self.fun_args, tol=self.tol, max_iter=self.max_iter, w_init=self.w_init, whiten_solver=self.whiten_solver, random_state=self.random_state, ) # ---------------------------------------------------------------------------------- # Private methods (overloading abstract classes) # ---------------------------------------------------------------------------------- def _fit(self, X, Y=None): # this method is called by the abstract class fit. # Input X is a np.ndarray # Y is ignored in this model # call the sklearn _fit function on data # _outfit is a tuple handle the eventual output of _fit for further processing. # The _outfit members are np.ndarrays _outfit = self._fast_ica.fit(X) self._n_components = int( self._fast_ica.n_components, ) # cast the returned int64 to int return _outfit def _transform(self, X): return self._fast_ica.transform(X) def _inverse_transform(self, X_transform): # we need to set self._fast_ica.components_ to a compatible size but without # destroying the full matrix: store_components_ = self._fast_ica.components_ self._fast_ica.components_ = self._fast_ica.components_[: X_transform.shape[1]] X = self._fast_ica.inverse_transform(X_transform) # restore self._fast_ica.components_ = store_components_ return X def _get_components(self): self._components = self._fast_ica.components_ return self._components _docstring.keep_params("analysis_fit.parameters", "X")
[docs] @_docstring.dedent def fit(self, X): """ Fit the FastICA model on X. Parameters ---------- %(analysis_fit.parameters.X)s Returns ------- %(analysis_fit.returns)s See Also -------- %(analysis_fit.see_also)s """ return super().fit(X, Y=None)
@property @_wrap_ndarray_output_to_nddataset( units=None, title=None, typey="features", typex="components", ) def mixing(self): r""" The pseudo inverse of components. NDDataset of size (`n_features`, `n_components`). It is the linear operator that maps independent sources to the data, and the transpose of `St`. """ return self._fast_ica.mixing_ @property @_wrap_ndarray_output_to_nddataset( units=None, title=None, typey="components", ) def St(self): r""" The spectral profiles of the independant sources. NDDataset of size (`n_components`, `n_features`). It is the transpose of the ``mixing_`` matrix returned by Scikit-Learn. """ return self._fast_ica.mixing_.T @property @_wrap_ndarray_output_to_nddataset( units=None, title=None, typex="components", ) def A(self): r""" The mixing system A. NDDataset of size (`n_observations`, `n_components`). It is the matrix returned by the `transform()` method. """ return self._fast_ica.transform(self.X.data) @property @_wrap_ndarray_output_to_nddataset() def mean(self): r""" The mean of X over features. Only set if `whiten` is True, it is needed (and used) to reconstruct a dataset by ``inverse_transform(A)``. """ return self._fast_ica.mean_ @property def n_iter(self): """ Number of iterations. If the algorithm is “deflation”, n_iter is the maximum number of iterations run across all components. Else they are just the number of iterations taken to converge. """ return self._fast_ica.n_iter_ @property @_wrap_ndarray_output_to_nddataset( units=None, title=None, typey="components", ) def whitening(self): """ NDDataset of shape (n_components, n_features). Only set if whiten is not None. This is the pre-whitening matrix that projects data onto the first n_components principal components. """ if self.whiten: return self._fast_ica.whitening_ return None