# ======================================================================================
# Copyright (©) 2015-2025 LCS - Laboratoire Catalyse et Spectrochimie, Caen, France.
# CeCILL-B FREE SOFTWARE LICENSE AGREEMENT
# See full LICENSE agreement in the root directory.
# ======================================================================================
__all__ = ["concatenate", "stack"]
__dataset_methods__ = __all__
from warnings import warn
import numpy as np
from spectrochempy.core.dataset.baseobjects.ndarray import DEFAULT_DIM_NAME
from spectrochempy.core.dataset.coord import Coord
from spectrochempy.core.dataset.coordset import CoordSet
from spectrochempy.utils import exceptions
from spectrochempy.utils.datetimeutils import utcnow
from spectrochempy.utils.decorators import deprecated
from spectrochempy.utils.orderedset import OrderedSet
[docs]
def concatenate(*datasets, **kwargs):
r"""
Concatenation of `NDDataset` objects along a given axis.
Any number of `NDDataset` objects can be concatenated (by default
the last on the last dimension). For this operation
to be defined the following must be true :
#. all inputs must be valid `NDDataset` objects;
#. units of data must be compatible
#. concatenation is along the axis specified or the last one;
#. along the non-concatenated dimensions, shapes must match.
Parameters
----------
*datasets : positional `NDDataset` arguments
The dataset(s) to be concatenated to the current dataset. The datasets
must have the same shape, except in the dimension corresponding to axis
(the last, by default).
**kwargs
Optional keyword parameters (see Other Parameters).
Returns
-------
out
A `NDDataset` created from the contenations of the `NDDataset` input objects.
Other Parameters
----------------
dims : str, optional, default='x'
The dimension along which the operation is applied.
axis : int, optional
The axis along which the operation is applied.
See Also
--------
stack : Stack of `NDDataset` objects along a new dimension.
Examples
--------
>>> A = scp.read('irdata/nh4y-activation.spg', protocol='omnic')
>>> B = scp.read('irdata/nh4y-activation.scp')
>>> C = scp.concatenate(A[10:], B[3:5], A[:10], axis=0)
>>> A[10:].shape, B[3:5].shape, A[:10].shape, C.shape
((45, 5549), (2, 5549), (10, 5549), (57, 5549))
or
>>> D = A.concatenate(B, B, axis=0)
>>> A.shape, B.shape, D.shape
((55, 5549), (55, 5549), (165, 5549))
>>> E = A.concatenate(B, axis=1)
>>> A.shape, B.shape, E.shape
((55, 5549), (55, 5549), (55, 11098))
"""
# check uise
if "force_stack" in kwargs:
deprecated("force_stack", replace="method stack()")
return stack(datasets)
# get a copy of input datasets in order that input data are not modified
datasets = _get_copy(datasets)
# get axis from arguments
axis, dim = datasets[0].get_axis(**kwargs)
# check shapes, except for dim along which concatenation will be done
shapes = {ds.shape[:axis] + ds.shape[axis + 1 :] for ds in datasets}
if len(shapes) != 1:
raise exceptions.DimensionsCompatibilityError(
"all input arrays must have the same shape",
)
# check units
units = tuple({ds.units for ds in datasets})
if len(units) == 1:
units = datasets[0].units
else:
# check compatibility
for i, u1 in enumerate(units[:-1]):
for u2 in units[i + 1 :]:
if u1.dimensionality != u2.dimensionality:
raise exceptions.UnitsCompatibilityError(
f"Units of the data are {[str(u) for u in units]}. The datasets can't be concatenated",
)
# should be compatible, so convert
units = datasets[0].units
for ds in datasets[1:]:
if ds.units != units:
ds.ito(units)
# concatenate or stack the data array + mask
# --------------------------------------------
sss = []
if datasets[0].origin == "topspin":
# we can use metadata to create new coordinates
metacoords = {}
meta0 = datasets[0].meta
for i, dataset in enumerate(datasets):
if i == 0:
continue
meta = dataset.meta
for key in meta0:
if key in ["file_size", "pprog", "phc0", "phc1", "nsold"]:
continue
keepitem = key if key != "date" else "timestamp"
if np.any(meta0[key][-1] != meta[key][-1]):
if hasattr(meta0[key][-1], "size") and meta0[key][-1].size > 1:
# case of pulse length or delays for instance
for i in range(meta0[key][-1].size):
if np.any(meta0[key][-1][i] == meta[key][-1][i]):
continue
itemi = f"{key}{i}"
if itemi not in metacoords:
metacoords[itemi] = [
meta0[key][-1][i],
meta[key][-1][i],
]
else:
metacoords[itemi].append(meta[key][-1][i])
continue
if keepitem not in metacoords:
metacoords[keepitem] = [meta0[key][-1], meta[key][-1]]
else:
metacoords[keepitem].append(meta[key][-1])
for _i, dataset in enumerate(datasets):
d = dataset.masked_data
sss.append(d)
sconcat = np.ma.concatenate(sss, axis=axis)
data = np.asarray(sconcat)
mask = sconcat.mask
# now manage coordinates and labels
coords = datasets[0].coordset
if coords is not None and not coords[dim].is_empty:
labels = []
if coords[dim].is_labeled:
for ds in datasets:
labels.append(ds.coordset[dim].labels)
if coords[dim]._implements() in ["Coord"]:
coords[dim] = Coord(coords[dim])
if labels != []:
coords[dim]._labels = np.concatenate(labels)
elif coords[dim]._implements("CoordSet"):
if labels != []:
labels = np.array(labels, dtype=object)
for i, coord in enumerate(coords[dim]._coords):
try:
labels_not_none = np.all(
labels[:, i] != [None] * len(labels[:, i]),
)
except ValueError:
labels_not_none = True
if labels_not_none:
coord._labels = np.concatenate(list(labels[:, i]))
coord_data_tuple = tuple(ds.coordset[dim].data for ds in datasets)
none_coord = len([x for x in coord_data_tuple if x is None])
if not none_coord:
coords[dim]._data = np.concatenate(coord_data_tuple)
else:
warn(
f"Some dataset(s) coordinates in the {dim} dimension are None.",
stacklevel=2,
)
out = dataset.copy()
out._data = data
if coords is not None:
out._coordset[dim] = coords[dim]
# for topspin data, we can create new coordinates from metadata
if datasets[0].origin == "topspin" and metacoords != {}:
c = []
for key, value in metacoords.items():
c.append(Coord(value, title=key))
out.y = CoordSet(c)
out._mask = mask
out._units = units
out.description = f"Concatenation of {len(datasets)} datasets:\n"
out.description += f"( {datasets[0].name}"
out.title = datasets[0].title
authortuple = (datasets[0].author,)
for dataset in datasets[1:]:
if out.title != dataset.title:
warn(
"Different data title => the title is that of the 1st dataset",
stacklevel=2,
)
if dataset.author not in authortuple:
authortuple = authortuple + (dataset.author,)
out.author = " & ".join([str(author) for author in authortuple])
out.description += f", {dataset.name}"
out.description += " )"
out._date = out._modified = utcnow()
out.history = ["Created by concatenate"]
return out
[docs]
def stack(*datasets):
"""
Stack of `NDDataset` objects along a new dimension.
Any number of `NDDataset` objects can be stacked. For this operation
to be defined the following must be true :
#. all inputs must be valid dataset objects,
#. units of data and axis must be compatible (rescaling is applied
automatically if necessary).
Parameters
----------
*datasets : a series of `NDDataset`
The dataset to be stacked to the current dataset.
Returns
-------
out
A `NDDataset` created from the stack of the `datasets` datasets.
See Also
--------
concatenate : Concatenate `NDDataset` objects along a given dimension.
Examples
--------
>>> A = scp.read('irdata/nh4y-activation.spg', protocol='omnic')
>>> B = scp.read('irdata/nh4y-activation.scp')
>>> C = scp.stack(A, B)
>>> print(C)
NDDataset: [float64] a.u. (shape: (z:2, y:55, x:5549))
"""
datasets = _get_copy(datasets)
shapes = {ds.shape for ds in datasets}
if len(shapes) != 1:
raise exceptions.DimensionsCompatibilityError(
"all input arrays must have the same shape",
)
# prepend a new dimension
for i, dataset in enumerate(datasets):
dataset._data = dataset.data[np.newaxis]
dataset._mask = dataset.mask[np.newaxis]
newcoord = Coord([i], labels=[dataset.name])
newcoord.name = (OrderedSet(DEFAULT_DIM_NAME) - dataset._dims).pop()
dataset.add_coordset(newcoord)
dataset.dims = [newcoord.name] + dataset.dims
return concatenate(*datasets, dims=0)
# utility functions
# --------------------
def _get_copy(datasets):
# get a copy of datasets from the input
if isinstance(datasets, tuple) and isinstance(datasets[0], list | tuple):
datasets = datasets[0]
return [ds.copy() for ds in datasets]