Source code for spectrochempy.core.readers.read_csv

# ======================================================================================
# Copyright (©) 2015-2025 LCS - Laboratoire Catalyse et Spectrochimie, Caen, France.
# CeCILL-B FREE SOFTWARE LICENSE AGREEMENT
# See full LICENSE agreement in the root directory.
# ======================================================================================
__all__ = ["read_csv"]
__dataset_methods__ = __all__

import csv

# --------------------------------------------------------------------------------------
# standard and other imports
# --------------------------------------------------------------------------------------
import locale
import warnings
from datetime import datetime

import numpy as np

from spectrochempy.core import preferences as prefs
from spectrochempy.core.dataset.coord import Coord
from spectrochempy.core.readers.importer import Importer
from spectrochempy.core.readers.importer import _importer_method
from spectrochempy.core.readers.importer import _openfid
from spectrochempy.utils.docreps import _docstring

try:
    locale.setlocale(locale.LC_ALL, "en_US")  # to avoid problems with date format
except Exception:  # pragma: no cover
    try:
        locale.setlocale(
            locale.LC_ALL,
            "en_US.utf8",
        )  # to avoid problems with date format
    except Exception:
        warnings.warn("Could not set locale: en_US or en_US.utf8", stacklevel=2)


# ======================================================================================
# Public functions
# ======================================================================================
_docstring.delete_params("Importer.see_also", "read_csv")


[docs] @_docstring.dedent def read_csv(*paths, **kwargs): """ Open a :file:`.csv` file or a list of :file:`.csv` files. This is limited to 1D array - csv file must have two columns [index, data] without header. Parameters ---------- %(Importer.parameters)s Returns ------- %(Importer.returns)s Other Parameters ---------------- %(Importer.other_parameters)s See Also -------- %(Importer.see_also.no_read_csv)s Examples -------- >>> scp.read_csv('agirdata/P350/TGA/tg.csv') NDDataset: [float64] unitless (shape: (y:1, x:3247)) Additional information can be stored in the dataset if the origin is given (known origin for now : tga or omnic) # TODO: define some template to allow adding new origins >>> A = scp.read_csv('agirdata/P350/TGA/tg.csv', origin='tga') Sometimes the delimiteur needs to be adjusted >>> prefs = scp.preferences >>> B = scp.read_csv('irdata/IR.CSV', origin='omnic', csv_delimiter=',') """ kwargs["filetypes"] = ["CSV files (*.csv)"] kwargs["protocol"] = ["csv"] importer = Importer() return importer(*paths, **kwargs)
# ====================================================================================== # Private functions # ====================================================================================== @_importer_method def _read_csv(*args, **kwargs): # read csv file dataset, filename = args delimiter = kwargs.get("csv_delimiter", prefs.csv_delimiter) fid, kwargs = _openfid(filename, mode="r", **kwargs) txt = fid.read() fid.close() # We assume this csv file contains only numbers # TODO: write a more general reader if ";" in txt: # look like the delimiter is ; # if comma is also present, it could be that french writer was used. txt = txt.replace(",", ".") delimiter = ";" d = list(csv.reader(txt.splitlines(), delimiter=delimiter)) d = np.array(d, dtype=float).T # First column is the x coordinates coordx = Coord(d[0]) # Create a second coordinate for dimension y of size 1 coordy = Coord([0]) # and data is the second column - we make it a vector data = d[1].reshape((1, coordx.size)) # try: # d = np.loadtxt(fid, unpack=True, delimiter=delimiter) # fid.close() # # except ValueError: # # it might be that the delimiter is not correct (default is ','), but # # french excel export with the french locale for instance, use ";". # _delimiter = ";" # try: # if fid: # fid.close() # fid, kwargs = _openfid(filename, mode="r", **kwargs) # d = np.loadtxt(fid, unpack=True, delimiter=_delimiter) # fid.close() # # except Exception: # pragma: no cover # # in french, very often the decimal '.' is replaced by a # # comma: Let's try to correct this # if fid: # fid.close() # fid, kwargs = _openfid(filename, mode="r", **kwargs) # txt = fid.read() # fid.close() # # txt = txt.replace(",", ".") # # fid = io.StringIO(txt) # try: # d = np.loadtxt(fid, unpack=True, delimiter=delimiter) # except Exception: # raise IOError( # "{} is not a .csv file or its structure cannot be recognized" # ) # Update the dataset dataset.data = data dataset.set_coordset(y=coordy, x=coordx) # set the additional attributes name = filename.stem dataset.filename = filename dataset.name = kwargs.get("name", name) dataset.title = kwargs.get("title", None) dataset.units = kwargs.get("units", None) dataset.description = kwargs.get("description", '"name" ' + "read from .csv file") dataset.history = "Read from .csv file" # here we can check some particular format origin = kwargs.get("origin", "") if "omnic" in origin: # this will be treated as csv export from omnic (IR data) dataset = _add_omnic_info(dataset, **kwargs) elif "tga" in origin: # this will be treated as csv export from tga analysis dataset = _add_tga_info(dataset, **kwargs) elif origin: raise NotImplementedError( f"Sorry, but reading a csv file with '{origin}' origin is not implemented. " "Please, remove or set the keyword 'origin'\n " "(Up to now implemented csv files are: `omnic` , `tga` )", ) # reset modification date to cretion date dataset._modified = dataset._created return dataset def _add_omnic_info(dataset, **kwargs): # get the time and name name = desc = dataset.name # modify the dataset metadata dataset.units = "absorbance" dataset.title = "absorbance" dataset.name = name dataset.description = f"Dataset from .csv file: {desc}\n" dataset.history = "Read from omnic exported csv file." dataset.origin = "omnic" # x axis dataset.x.units = "cm^-1" # y axis ? if "_" in name: name, dat = name.split("_") # if needed convert weekday name to English dat = dat.replace("Lun", "Mon") dat = dat[:3].replace("Mar", "Tue") + dat[3:] dat = dat.replace("Mer", "Wed") dat = dat.replace("Jeu", "Thu") dat = dat.replace("Ven", "Fri") dat = dat.replace("Sam", "Sat") dat = dat.replace("Dim", "Sun") # convert month name to English dat = dat.replace("Aout", "Aug") # get the dates acqdate = datetime.strptime(dat, "%a %b %d %H-%M-%S %Y") # Transform back to timestamp for storage in the Coord object # use datetime.fromtimestamp(d, timezone.utc)) # to transform back to datetime obkct timestamp = acqdate.timestamp() dataset.y = Coord(np.array([timestamp]), name="y") dataset.set_coordtitles(y="acquisition timestamp (GMT)", x="wavenumbers") dataset.y.labels = np.array([[acqdate], [name]]) dataset.y.units = "s" # reset modification date to cretion date dataset._modified = dataset._created return dataset def _add_tga_info(dataset, **kwargs): # for TGA, some information are needed. # we add them here dataset.x.units = "hour" dataset.units = "percent" dataset.x.title = "time-on-stream" dataset.title = "mass change" dataset.origin = "tga" return dataset