# ======================================================================================
# Copyright (©) 2015-2025 LCS - Laboratoire Catalyse et Spectrochimie, Caen, France.
# CeCILL-B FREE SOFTWARE LICENSE AGREEMENT
# See full LICENSE agreement in the root directory.
# ======================================================================================
"""Implementation of NMF model (using scikit-learn library)."""
import logging
import traitlets as tr
from numpy.random import RandomState
from sklearn import decomposition
from spectrochempy.analysis._base._analysisbase import DecompositionAnalysis
from spectrochempy.utils.decorators import deprecated
from spectrochempy.utils.decorators import signature_has_configurable_traits
from spectrochempy.utils.docreps import _docstring
__all__ = ["NMF"]
__configurables__ = ["NMF"]
# ======================================================================================
# class NMF
# ======================================================================================
[docs]
@signature_has_configurable_traits
class NMF(DecompositionAnalysis):
_docstring.delete_params("DecompositionAnalysis.see_also", "NMF")
__doc__ = _docstring.dedent(
r"""
Non-Negative Matrix Factorization (NMF).
Use `sklearn.decomposition.NMF`.
Find two non-negative matrices, *i.e.,* matrices with all non-negative elements,
(``W``, ``H``) whose product approximates the non-negative matrix `X`.
This factorization can be used for example for dimensionality reduction,
source separation or topic extraction.
Parameters
----------
%(AnalysisConfigurable.parameters)s
See Also
--------
%(DecompositionAnalysis.see_also.no_NMF)s
""",
)
# ----------------------------------------------------------------------------------
# Runtime Parameters,
# only those specific to NMF, the other being defined in AnalysisConfigurable.
# ----------------------------------------------------------------------------------
# define here only the variable that you use in fit or transform functions
_nmf = tr.Instance(
decomposition.NMF,
help="The instance of sklearn.decomposition.NMF used in this model",
)
# ----------------------------------------------------------------------------------
# Configuration parameters
# ----------------------------------------------------------------------------------
n_components = tr.Integer(
default_value=2,
allow_none=True,
help="Number of components to use.",
).tag(config=True)
init = tr.Enum(
["random", "nndsvd", "nndsvda", "nndsvdar", "custom"],
default_value=None,
allow_none=True,
help=(
"Method used to initialize the procedure.\n\n"
"Valid options:\n\n"
"* `None` : 'nndsvda' if n_components <= min(n_samples, n_features), "
"otherwise random.\n"
"* `random` : non-negative random matrices, scaled with:\n"
" sqrt(X.mean() / n_components)\n"
"* `nndsvd` : Nonnegative Double Singular Value Decomposition (NNDSVD) "
"initialization (better for sparseness)\n"
"* `nndsvda` : NNDSVD with zeros filled with the average of X "
"(better when sparsity is not desired)\n"
"* `nndsvdar` NNDSVD with zeros filled with small random values "
"(generally faster, less accurate alternative to NNDSVDa "
"for when sparsity is not desired)\n"
"* `custom` : use custom matrices W and H."
),
).tag(config=True)
solver = tr.Enum(
["cd", "mu"],
default_value="cd",
help=(
"Numerical solver to use:\n"
"- 'cd' is a Coordinate Descent solver.\n"
"- 'mu' is a Multiplicative Update solver."
),
).tag(config=True)
beta_loss = tr.Union(
(tr.Float(), tr.Enum(["frobenius", "kullback-leibler", "itakura-saito"])),
default_value="frobenius",
help=(
"Beta divergence to be minimized, measuring the distance between X"
"and the dot product WH. Note that values different from 'frobenius' "
"(or 2) and 'kullback-leibler' (or 1) lead to significantly slower fits.\n"
"Note that for beta_loss <= 0 (or 'itakura-saito'), the input matrix X "
"cannot contain zeros. Used only in 'mu' solver."
),
).tag(config=True)
tol = tr.Float(default_value=1e-4, help="Tolerance of the stopping condition.").tag(
config=True,
)
max_iter = tr.Integer(
default_value=200,
help="Maximum number of iterations before timing out.",
).tag(config=True)
random_state = tr.Union(
(tr.Integer(), tr.Instance(RandomState)),
allow_none=True,
default_value=None,
help=(
"Used for initialisation (when `init` == 'nndsvdar' or 'random'), and "
"in Coordinate Descent. Pass an int, for reproducible results across "
"multiple function calls."
),
).tag(config=True)
alpha_W = tr.Float( # noqa: N815
default_value=0.0,
help="Constant that multiplies the regularization terms of `W` . Set it to zero"
"(default) to have no regularization on `W` .",
).tag(config=True)
alpha_H = tr.Union( # noqa: N815
(tr.Float(), tr.Enum(["same"])),
default_value="same",
help=(
"Constant that multiplies the regularization terms of `H` . Set it to zero"
'to have no regularization on `H` . If "same" (default), it takes the same'
"value as `alpha_W` ."
),
).tag(config=True)
l1_ratio = tr.Float(
default_value=0.0,
help=(
"The regularization mixing parameter, with 0 <= l1_ratio <= 1.\n"
"- For l1_ratio = 0 the penalty is an elementwise L2 penalty (aka Frobenius "
"Norm).\n"
"- For l1_ratio = 1 it is an elementwise L1 penalty.\n"
"- For 0 < l1_ratio < 1, the penalty is a combination of L1 and L2."
),
).tag(config=True)
shuffle = tr.Bool(
default_value=False,
help="If true, randomize the order of coordinates in the CD solver.",
).tag(config=True)
# ----------------------------------------------------------------------------------
# Initialization
# ----------------------------------------------------------------------------------
def __init__(
self,
*,
log_level="WARNING",
warm_start=False,
**kwargs,
):
if "used_components" in kwargs:
deprecated("used_components", replace="n_components", removed="0.7")
kwargs["n_components"] = kwargs.pop("used_components")
# call the super class for initialisation of the configuration parameters
# to do before anything else!
super().__init__(
log_level=log_level,
warm_start=warm_start,
**kwargs,
)
# initialize sklearn NMF
self._nmf = decomposition.NMF(
n_components=self.n_components,
init=self.init,
beta_loss=self.beta_loss,
tol=self.tol,
max_iter=self.max_iter,
random_state=self.random_state,
alpha_W=self.alpha_W,
alpha_H=self.alpha_H,
l1_ratio=self.l1_ratio,
verbose=self.parent.log_level == logging.INFO,
)
# ----------------------------------------------------------------------------------
# Private methods (overloading abstract classes)
# ----------------------------------------------------------------------------------
def _fit(self, X, Y=None):
# this method is called by the abstract class fit.
# Input X is a np.ndarray
# Y is ignored in this model
# call the sklearn _fit function on data
# _outfit is a tuple handle the eventual output of _fit for further processing.
# The _outfit members are np.ndarrays
_outfit = self._nmf.fit(X)
self._n_components = int(
self._nmf.n_components,
) # cast the returned int64 to int
return _outfit
def _transform(self, X):
return self._nmf.transform(X)
def _inverse_transform(self, X_transform):
# we need to set self._nmf.components_ to a compatible size but without
# destroying the full matrix:
store_components_ = self._nmf.components_
self._nmf.components_ = self._nmf.components_[: X_transform.shape[1]]
X = self._nmf.inverse_transform(X_transform)
# restore
self._nmf.components_ = store_components_
return X
def _get_components(self):
self._components = self._nmf.components_
return self._components
_docstring.keep_params("analysis_fit.parameters", "X")
[docs]
@_docstring.dedent
def fit(self, X):
"""
Fit the NMF model on X.
Parameters
----------
%(analysis_fit.parameters.X)s
Returns
-------
%(analysis_fit.returns)s
See Also
--------
%(analysis_fit.see_also)s
"""
return super().fit(X, Y=None)