# ======================================================================================
# Copyright (©) 2015-2025 LCS - Laboratoire Catalyse et Spectrochimie, Caen, France.
# CeCILL-B FREE SOFTWARE LICENSE AGREEMENT
# See full LICENSE agreement in the root directory.
# ======================================================================================
"""Provides methods for reading data in a directory after a carroucell experiment."""
__all__ = ["read_carroucell"]
__dataset_methods__ = __all__
import datetime
import os
import re
import warnings
import numpy as np
import scipy.interpolate
import xlrd
from spectrochempy.application import info_
from spectrochempy.core.dataset.coord import Coord
from spectrochempy.core.readers.importer import Importer
from spectrochempy.core.readers.importer import _importer_method
from spectrochempy.core.readers.read_omnic import read_omnic
from spectrochempy.utils.datetimeutils import UTC
from spectrochempy.utils.docreps import _docstring
from spectrochempy.utils.file import get_directory_name
from spectrochempy.utils.file import get_filenames
_docstring.delete_params("Importer.see_also", "read_carroucell")
[docs]
@_docstring.dedent
def read_carroucell(directory=None, **kwargs):
    r"""
    Open :file:`.spa` files in a directory after a :term:`carroucell` experiment.
    The files for a given sample are grouped in `NDDataset`\ s (sorted by
    acquisition date).
    The `NDDataset`\ s are returned in a list sorted by sample number.
    When the file containing the temperature data is present, the temperature is read
    and assigned as a label to each spectrum.
    Parameters
    ----------
    directory : `str`, optional
        If not specified, opens a dialog box.
    %(kwargs)s
    Returns
    -------
    %(Importer.returns)s
    Other Parameters
    ----------------
    spectra : :term:`array-like` of 2 `int` (``min`` , ``max`` ), optional, default: `None`
        The first and last spectrum to be loaded as determined by their number.
        If `None` all spectra are loaded.
    discardbg : `bool`, optional, default: `True`
        If `True` : do not load background (sample #9).
    delta_clocks : `int`, optional, default:  0
        Difference in seconds between the clocks used for spectra and temperature
        acquisition. Defined as ``t(thermocouple clock) - t(spectrometer clock)`` .
    See Also
    --------
    %(Importer.see_also.no_read_carroucell)s
    Notes
    -----
    All files are expected to be present in the same directory and their filenames
    are expected to be in the format : :file:`X_samplename_YYY.spa`
    and for the background files : :file:`X_BCKG_YYYBG.spa`
    where ``X`` is the sample holder number and ``YYY`` the spectrum number.
    Examples
    --------
    >>> scp.read_carroucell("irdata/carroucell_samp")
    no temperature file
    [NDDataset: [float64] a.u. (shape: (y:6, x:11098)), NDDataset: ...
    """
    kwargs["filetypes"] = ["Carroucell files (*.spa)"]
    kwargs["protocol"] = ["carroucell"]
    importer = Importer()
    return importer(directory, **kwargs) 
# --------------------------------------------------------------------------------------
# Private methods
# --------------------------------------------------------------------------------------
@_importer_method
def _read_carroucell(*args, **kwargs):
    _, directory = args
    directory = get_directory_name(directory)
    if not directory:  # pragma: no cover
        # probably cancel has been chosen in the open dialog
        info_("No directory was selected.")
        return None
    spectra = kwargs.get("spectra")
    discardbg = kwargs.get("discardbg", True)
    delta_clocks = datetime.timedelta(seconds=kwargs.get("delta_clocks", 0))
    datasets = []
    # get the sorted list of spa files in the directory
    spafiles = sorted(get_filenames(directory, **kwargs)[".spa"])
    spafilespec = [f for f in spafiles if "BCKG" not in f.stem]
    spafileback = [f for f in spafiles if "BCKG" in f.stem]
    # select files
    def prefix(f):
        return f.stem.split("_")[0]
    def number(f):
        return int(f.stem.split("_")[1])
    if spectra is not None:
        [min, max] = spectra
        spafilespec = [f for f in spafilespec if min <= number(f) <= max]
        spafileback = [f for f in spafileback if min <= number(f) <= max]
    # discard BKG files
    spafiles = spafilespec
    if not discardbg:
        spafiles += spafileback
    # merge dataset with the same number
    curfilelist = [spafiles[0]]
    curprefix = prefix(spafiles[0])
    for f in spafiles[1:]:
        if prefix(f) != curprefix:
            ds = read_omnic(
                curfilelist,
                sortbydate=True,
                directory=directory,
                name=curprefix,
            )
            datasets.append(ds)
            curfilelist = [f]
            curprefix = prefix(f)
        else:
            curfilelist.append(f)
    ds = read_omnic(curfilelist, sortbydate=True, directory=directory, name=curprefix)
    datasets.append(ds)
    # Now manage temperature
    Tfile = sorted([f for f in os.listdir(directory) if f[-4:].lower() == ".xls"])
    if len(Tfile) == 0:
        info_("no temperature file")
    elif len(Tfile) > 1:
        warnings.warn(
            "several .xls/.csv files. The temperature will not be read",
            stacklevel=2,
        )
    else:
        Tfile = Tfile[0]
        if Tfile[-4:].lower() == ".xls":
            book = xlrd.open_workbook(os.path.join(directory, Tfile))
            # determine experiment start and end time (thermocouple clock)
            ti = datasets[0].y.labels[0][0] + delta_clocks
            tf = datasets[-1].y.labels[-1][0] + delta_clocks
            # get thermocouple time and T information during the experiment
            t = []
            T = []
            sheet = book.sheet_by_index(0)
            for i in range(9, sheet.nrows):
                try:
                    time = datetime.datetime.strptime(
                        sheet.cell(i, 0).value,
                        "%d/%m/%y %H:%M:%S",
                    ).replace(tzinfo=UTC)
                    if ti <= time <= tf:
                        t.append(time)
                        T.append(sheet.cell(i, 4).value)
                except ValueError:
                    pass
                except TypeError:
                    pass
            # interpolate T = f(timestamp)
            tstamp = [time.timestamp() for time in t]
            # interpolate, except for the first and last points that are extrapolated
            interpolator = scipy.interpolate.interp1d(
                tstamp,
                T,
                fill_value="extrapolate",
                assume_sorted=True,
            )
            for ds in datasets:
                # timestamp of spectra for the thermocouple clock
                tstamp_ds = [
                    (label[0] + delta_clocks).timestamp() for label in ds.y.labels
                ]
                T_ds = interpolator(tstamp_ds)
                newlabels = np.hstack((ds.y.labels, T_ds.reshape((50, 1))))
                ds.y = Coord(title=ds.y.title, data=ds.y.data, labels=newlabels)
    if len(datasets) == 1:
        return datasets[0]  # a single dataset is returned
    # several datasets returned, sorted by sample #
    return sorted(datasets, key=lambda ds: re.split("-|_", ds.name)[0])