Source code for spectrochempy.core.readers.importer

# ======================================================================================
# Copyright (©) 2015-2025 LCS - Laboratoire Catalyse et Spectrochimie, Caen, France.
# CeCILL-B FREE SOFTWARE LICENSE AGREEMENT
# See full LICENSE agreement in the root directory.
# ======================================================================================
"""Define a generic class to import directories, files, and contents."""

__all__ = ["read", "read_dir"]  # , "read_remote"]
__dataset_methods__ = __all__

import io
import re
from warnings import warn
from zipfile import ZipFile

import requests
import yaml
from traitlets import Dict
from traitlets import HasTraits
from traitlets import List
from traitlets import Type
from traitlets import Unicode

from spectrochempy.application import info_
from spectrochempy.application import warning_
from spectrochempy.utils.docreps import _docstring
from spectrochempy.utils.exceptions import DimensionsCompatibilityError
from spectrochempy.utils.exceptions import ProtocolError
from spectrochempy.utils.file import check_filename_to_open
from spectrochempy.utils.file import get_directory_name
from spectrochempy.utils.file import get_filenames
from spectrochempy.utils.file import pathclean

FILETYPES = [
    ("scp", "SpectroChemPy files (*.scp)"),
    ("omnic", "Nicolet OMNIC files and series (*.spa *.spg *.srs)"),
    ("soc", "Surface Optics Corp. (*.ddr *.hdr *.sdr)"),
    ("labspec", "LABSPEC exported files (*.txt)"),
    ("opus", "Bruker OPUS files (*.[0-9]*)"),
    (
        "topspin",
        "Bruker TOPSPIN fid or series or processed data files "
        "(fid ser 1[r|i] 2[r|i]* 3[r|i]*)",
    ),
    ("matlab", "MATLAB files (*.mat)"),
    ("dso", "Data Set Object files (*.dso)"),
    ("jcamp", "JCAMP-DX files (*.jdx *.dx)"),
    ("csv", "CSV files (*.csv)"),
    ("excel", "Microsoft Excel files (*.xls)"),
    ("zip", "Compressed folder of data files (*.zip)"),
    ("quadera", "Quadera ascii files (*.asc)"),
    ("carroucell", "Carroucell files (*spa)"),
    ("galactic", "GRAMS/Thermo Galactic files (*.spc)"),
    ("wire", "Renishaw WiRE files (*.wdf)"),
    #  ('all', 'All files (*.*)')
]
ALIAS = [
    ("spg", "omnic"),
    ("spa", "omnic"),
    ("ddr", "soc"),
    ("hdr", "soc"),
    ("sdr", "soc"),
    ("spc", "galactic"),
    ("srs", "omnic"),
    ("mat", "matlab"),
    ("txt", "labspec"),
    ("jdx", "jcamp"),
    ("dx", "jcamp"),
    ("xls", "excel"),
    ("asc", "quadera"),
    ("wdf", "wire"),
]


# --------------------------------------------------------------------------------------
class Importer(HasTraits):
    # Private Importer class

    objtype = Type()
    datasets = List()
    files = Dict()
    default_key = Unicode()
    protocol = Unicode()

    protocols = Dict()
    filetypes = Dict()

    def __init__(self):
        super().__init__()

        self.filetypes = dict(FILETYPES)
        temp = list(zip(*FILETYPES, strict=False))
        temp.reverse()
        self.protocols = dict(zip(*temp, strict=False))

        #  add alias

        self.alias = dict(ALIAS)

    def __call__(self, *args, **kwargs):
        self.datasets = []
        self.default_key = kwargs.pop("default_key", ".scp")

        if (
            "merge" not in kwargs
            and args
            and len(args) == 1
            and isinstance(args[0], list | tuple)
        ):
            # if merge is not specified, but the args are provided as a single list,
            # then will are supposed to merge the datasets. If merge is specified then
            # it has priority.
            # This is not useful for the 1D datasets, as if they are compatible they
            # are merged automatically
            kwargs["merge"] = True

        args, kwargs = self._setup_objtype(*args, **kwargs)
        res = check_filename_to_open(*args, **kwargs)
        if res:
            # Normal return
            self.files = res
        else:
            # Cancel in dialog!
            return None

        for key in self.files:
            # particular case of carroucell files
            if key == "" and kwargs.get("protocol") == ["carroucell"]:
                key = ".carroucell"
                self.files = {".carroucell": self.files[""]}

            # particular case of topspin files
            elif key == "" and kwargs.get("protocol") == ["topspin"]:
                key = ".topspin"
                self.files = {".topspin": self.files[""]}

            if key == "frombytes":
                # here we need to read contents
                for filename, content in self.files[key].items():
                    files_ = check_filename_to_open(filename)
                    kwargs["content"] = content
                    key_ = list(files_.keys())[0]
                    self._switch_protocol(key_, files_, **kwargs)
                if len(self.datasets) > 1:
                    self.datasets = self._do_merge(self.datasets, **kwargs)

            elif (
                key
                and key[1:]
                not in list(zip(*FILETYPES, strict=False))[0]
                + list(zip(*ALIAS, strict=False))[0]
            ):
                raise TypeError(f"Filetype `{key}` is unknown in spectrochempy")
            else:
                # here files are read / or remotely from the disk using filenames
                self._switch_protocol(key, self.files, **kwargs)

        # now we will reset preference for this newly loaded datasets
        if len(self.datasets) > 0:
            if all(self.datasets) is None:
                return None

            try:
                prefs = self.datasets[0].preferences
                prefs.reset()
            except (FileNotFoundError, AttributeError):
                pass
        else:
            return None

        if len(self.datasets) == 1:
            nd = self.datasets[0]  # a single dataset is returned
            name = kwargs.pop("name", None)
            if name:
                nd.name = name
            return nd

        nds = self.datasets
        names = kwargs.pop("names", None)
        if names and len(names) == len(nds):
            for nd, name in zip(nds, names, strict=False):
                nd.name = name
        elif names and len(names) != len(nds):
            warn(
                "length of the `names` list and of the list of datasets mismatch - names not applied",
                stacklevel=2,
            )
        return sorted(
            nds,
            key=str,
        )  # return a sorted list (sorted according to their string representation)

    def _setup_objtype(self, *args, **kwargs):
        # check if the first argument is an instance of NDDataset or Project

        args = list(args)
        if (
            args
            and hasattr(args[0], "_implements")
            and args[0]._implements() in ["NDDataset"]
        ):
            # the first arg is an instance of NDDataset
            object = args.pop(0)
            self.objtype = type(object)

        else:
            # by default returned objtype is NDDataset (import here to avoid circular import)
            from spectrochempy.core.dataset.nddataset import NDDataset

            self.objtype = kwargs.pop("objtype", NDDataset)

        return args, kwargs

    def _switch_protocol(self, key, files, **kwargs):
        protocol = kwargs.get("protocol")
        if protocol is not None and protocol != "ALL":
            if not isinstance(protocol, list):
                protocol = [protocol]
            if key and key[1:] not in protocol and self.alias[key[1:]] not in protocol:
                return

        datasets = []
        files[key] = sorted(files[key])  # sort the files according their names
        for filename in files[key]:
            read_ = getattr(self, f"_read_{key[1:]}")

            dataset = None
            try:
                # read locally or using url if filename is an url
                dataset = read_(self.objtype(), filename, **kwargs)

            except (FileNotFoundError, OSError) as exc:
                # file was not found.
                # it is an url we raise an error
                local_only = kwargs.get("local_only", False)
                if _is_url(filename) or local_only:
                    raise (FileNotFoundError) from exc

                # else, we try on github
                try:
                    # Try to get the file from github
                    kwargs["read_method"] = read_
                    info_(
                        "File/directory not found locally: Attempt to download it from "
                        "the GitHub repository `spectrochempy_data`...",
                    )
                    dataset = _read_remote(self.objtype(), filename, **kwargs)

                except FileNotFoundError as exc:
                    raise (FileNotFoundError) from exc

                except Exception as e:
                    warning_(str(e))

            except Exception as e:
                warning_(str(e))

            if dataset is not None:
                if not isinstance(dataset, list):
                    datasets.append(dataset)
                else:
                    datasets.extend(dataset)

        if len(datasets) > 1:
            datasets = self._do_merge(datasets, **kwargs)
            if kwargs.get("merge", False):
                datasets[0].name = pathclean(filename).stem
                datasets[0].filename = pathclean(filename)

        self.datasets.extend(datasets)

    def _do_merge(self, datasets, **kwargs):
        # several datasets returned (only if several files have been passed) and the `merge` keyword argument is False
        merged = kwargs.get("merge", False)
        shapes = list({nd.shape if hasattr(nd, "shape") else None for nd in datasets})
        if len(shapes) == 1 and None not in shapes:
            # homogeneous set of files
            # we can merge them if they are 1D spectra
            if len(shapes[0]) == 1 or shapes[0][0] == 1:
                merged = kwargs.get("merge", True)  # priority to the keyword setting
        else:
            # not homogeneous
            merged = kwargs.get("merge", False)

        if merged:
            # Try to stack the dataset into a single one
            try:
                if datasets[0].ndim == 1:
                    dataset = self.objtype.stack(datasets)
                    dataset.history = "Stacked from several files"
                else:
                    dataset = self.objtype.concatenate(datasets, axis=0)
                    dataset.history = "Merged from several files"

                if dataset.coordset is not None and kwargs.pop("sortbydate", True):
                    dataset.sort(dim=0, inplace=True)
                    #  dataset.history = "Sorted"  (this not always by date:
                    #  actually for now it is by value which can be a date or not)
                datasets = [dataset]

            except DimensionsCompatibilityError as e:
                warn(str(e), stacklevel=2)  # return only the list

        return datasets


def _importer_method(func):
    # Decorator to define a given read function as belonging to Importer
    setattr(Importer, func.__name__, staticmethod(func))
    return func


# --------------------------------------------------------------------------------------
# Public Generic Read function
# --------------------------------------------------------------------------------------

_docstring.get_sections(
    r"""
See Also
--------
read : Generic reader inferring protocol from the filename extension.
read_zip : Read Zip archives (containing spectrochempy readable files)
read_dir : Read an entire directory.
read_opus : Read OPUS spectra.
read_labspec : Read Raman LABSPEC spectra (:file:`.txt`).
read_omnic : Read Omnic spectra (:file:`.spa`, :file:`.spg`, :file:`.srs`).
read_soc : Read Surface Optics Corps. files (:file:`.ddr` , :file:`.hdr` or :file:`.sdr`).
read_galactic : Read Galactic files (:file:`.spc`).
read_quadera : Read a Pfeiffer Vacuum's QUADERA mass spectrometer software file.
read_topspin : Read TopSpin Bruker NMR spectra.
read_csv : Read CSV files (:file:`.csv`).
read_jcamp : Read Infrared JCAMP-DX files (:file:`.jdx`, :file:`.dx`).
read_matlab : Read Matlab files (:file:`.mat`, :file:`.dso`).
read_carroucell : Read files in a directory after a carroucell experiment.
read_wire : Read REnishaw Wire files (:file:`.wdf`).
""",
    sections=["See Also"],
    base="Importer",
)

_docstring.delete_params("Importer.see_also", "read")


[docs] @_docstring.dedent def read(*paths, **kwargs): r""" Read data from various file formats. This method is generally able to load experimental files based on extensions. Parameters ---------- *paths : `str`, `~pathlib.Path` object objects or valid urls, optional The data source(s) can be specified by the name or a list of name for the file(s) to be loaded: - e.g., ( filename1, filename2, ..., kwargs ) If the list of filenames are enclosed into brackets: - e.g., ( [filename1, filename2, ...], kwargs ) The returned datasets are merged to form a single dataset, except if ``merge`` is set to `False`. If a source is not provided (i.e., no ``paths`` , nor ``content``), a dialog box will be opened to select files. %(kwargs)s Returns ------- object : `NDDataset` or list of `NDDataset` The returned dataset(s). Other Parameters ---------------- protocol : `str`, optional ``Protocol`` used for reading. It can be one of {``'scp'``, ``'omnic'``, ``'opus'``, ``'topspin'``, ``'matlab'``, ``'jcamp'``, ``'csv'``, ``'excel'``}. If not provided, the correct protocol is inferred (whenever it is possible) from the filename extension. directory : `~pathlib.Path` object objects or valid urls, optional From where to read the files. merge : `bool`, optional, default: `False` If `True` and several filenames or a ``directory`` have been provided as arguments, then a single `NDDataset` with merged (stacked along the first dimension) is returned. sortbydate : `bool`, optional, default: `True` Sort multiple filename by acquisition date. description : `str`, optional A Custom description. origin : one of {``'omnic'``, ``'tga'``}, optional Used when reading with the CSV protocol. In order to properly interpret CSV file it can be necessary to set the origin of the spectra. Up to now only ``'omnic'`` and ``'tga'`` have been implemented. csv_delimiter : `str`, optional, default: `~spectrochempy.preferences.csv_delimiter` Set the column delimiter in CSV file. content : `bytes` object, optional Instead of passing a filename for further reading, a bytes content can be directly provided as bytes objects. The most convenient way is to use a dictionary. This feature is particularly useful for a GUI Dash application to handle drag and drop of files into a Browser. iterdir : `bool`, optional, default: `True` If `True` and no filename was provided, all files present in the provided ``directory`` are returned (and merged if ``merge`` is `True`. It is assumed that all the files correspond to current reading protocol. .. versionchanged:: 0.6.2 ``iterdir`` replace the deprecated ``listdir`` argument. recursive : `bool`, optional, default: `False` Read also in subfolders. replace_existing: `bool`, optional, default: `False` Used only when url are specified. By default, existing files are not replaced so not downloaded. download_only: `bool`, optional, default: `False` Used only when url are specified. If True, only downloading and saving of the files is performed, with no attempt to read their content. read_only: `bool`, optional, default: `True` Used only when url are specified. If True, saving of the files is performed in the current directory, or in the directory specified by the directory parameter. See Also -------- %(Importer.see_also.no_read)s Examples -------- Reading a single OPUS file (providing a windows type filename relative to the default `~spectrochempy.preferences.datadir` ) >>> scp.read('irdata\\OPUS\\test.0000') NDDataset: [float64] a.u. (shape: (y:1, x:2567)) Reading a single OPUS file (providing a unix/python type filename relative to the default ``datadir`` ) Note that here read_opus is called as a classmethod of the NDDataset class >>> scp.read('irdata/OPUS/test.0000') NDDataset: [float64] a.u. (shape: (y:1, x:2567)) Single file specified with pathlib.Path object >>> from pathlib import Path >>> folder = Path('irdata/OPUS') >>> p = folder / 'test.0000' >>> scp.read(p) NDDataset: [float64] a.u. (shape: (y:1, x:2567)) Multiple files not merged (return a list of datasets). Note that a directory is specified >>> le = scp.read('test.0000', 'test.0001', 'test.0002', directory='irdata/OPUS') >>> len(le) 3 >>> le[0] NDDataset: [float64] a.u. (shape: (y:1, x:2567)) Multiple files merged as the `merge` keyword is set to true >>> scp.read('test.0000', 'test.0001', 'test.0002', directory='irdata/OPUS', merge=True) NDDataset: [float64] a.u. (shape: (y:3, x:2567)) Multiple files to merge : they are passed as a list instead of using the keyword `merge` >>> scp.read(['test.0000', 'test.0001', 'test.0002'], directory='irdata/OPUS') NDDataset: [float64] a.u. (shape: (y:3, x:2567)) Multiple files not merged : they are passed as a list but `merge` is set to false >>> le = scp.read(['test.0000', 'test.0001', 'test.0002'], directory='irdata/OPUS', merge=False) >>> len(le) 3 Read without a filename. This has the effect of opening a dialog for file(s) selection >>> nd = scp.read() Read in a directory (assume that only OPUS files are present in the directory (else we must use the generic `read` function instead) >>> le = scp.read(directory='irdata/OPUS') >>> len(le) 2 Again we can use merge to stack all 4 spectra if thet have compatible dimensions. >>> scp.read(directory='irdata/OPUS', merge=True) [NDDataset: [float64] a.u. (shape: (y:1, x:5549)), NDDataset: [float64] a.u. (shape: (y:4, x:2567))] """ importer = Importer() protocol = kwargs.get("protocol") available_protocols = list(importer.protocols.values()) available_protocols.extend( list(importer.alias.keys()), ) # to handle variants of protocols if protocol is None: kwargs["filetypes"] = list(importer.filetypes.values()) kwargs["protocol"] = "ALL" default_filter = kwargs.get("default_filter") if default_filter is not None: kwargs["default_filter"] = importer.filetypes[default_filter] else: try: kwargs["filetypes"] = [importer.filetypes[protocol]] except KeyError as e: raise ProtocolError(protocol, list(importer.protocols.values())) from e except TypeError as e: info_(e) # deprecated kwargs listdir = kwargs.pop("listdir", True) if "listdir" in kwargs and "iterdir" not in kwargs: kwargs["iterdir"] = listdir warning_( "argument `listdir` is deprecated, use ìterdir` instead", category=DeprecationWarning, ) return importer(*paths, **kwargs)
# for some reasons the doctring.getsection modify the signature of the function # when used as a decorator, so we use it as a function _docstring.get_sections( read.__doc__, sections=["Parameters", "Other Parameters", "Returns"], base="Importer", ) _docstring.delete_params("Importer.see_also", "read_dir")
[docs] @_docstring.dedent def read_dir(directory=None, **kwargs): r""" Read an entire directory. Open a list of readable files in a and store data/metadata in a dataset or a list of datasets according to the following rules : - 2D spectroscopic data (e.g. valid .spg files or matlab arrays, etc...) from distinct files are stored in distinct `NDdataset`\ s. - 1D spectroscopic data (e.g., :file:`.spa` files) in a given directory are merged into single `NDDataset`, providing their unique dimension are compatible. If not, an error is generated. - non-readable files are ignored Parameters ---------- directory : str or pathlib Folder where are located the files to read. Returns ------- %(Importer.returns)s Depending on the python version, the order of the datasets in the list may change. See Also -------- %(Importer.see_also.no_read_dir)s Examples -------- >>> scp.preferences.csv_delimiter = ',' >>> A = scp.read_dir('irdata') >>> len(A) 4 >>> B = scp.read_dir() """ kwargs["iterdir"] = True importer = Importer() return importer(directory, **kwargs)
# _docstring.delete_params("Importer.see_also", "read_remote") # @_docstring.dedent # def read_remote(file_or_dir, **kwargs): # """ # Download and read files or an entire directory from any url # # The first usage in spectrochempy is the loading of test files in the # `spectrochempy_data repository <https://github.com/spectrochempy/spectrochempy_data>`__. # This is done only if the data are not yet # downloaded and present in the `~spectrochempy.preferences.datadir` directory. # # It can also be used to download and read file or directory from any url. # # Parameters # ---------- # path : `str`, `~pathlib.Path` object or an url. # When a file or folder is specified, it must be written as if it were present # locally exactly as for the `read` function. The correponding file or directory # is downloaded from the ``github spectrochemp_data`` repository. # Otherwise it should be a full and valid url. # %(kwargs)s # # Returns # -------- # %(Importer.returns)s # # Other Parameters # ---------------- # %(Importer.other_parameters)s # # See Also # -------- # %(Importer.see_also.no_read_remote)s # # Examples # -------- # # >>> A = scp.read_remote('irdata/subdir') # """ # kwargs["remote"] = True # importer = Importer() # return importer(file_or_dir, **kwargs) # # ====================================================================================== # Private read functions # ====================================================================================== @_importer_method def _read_dir(*args, **kwargs): _, directory = args directory = get_directory_name(directory) files = get_filenames(directory, **kwargs) datasets = [] valid_extensions = ( list(zip(*FILETYPES, strict=False))[0] + list(zip(*ALIAS, strict=False))[0] ) for key in [key for key in files if key[1:] in valid_extensions]: if key: importer = Importer() nd = importer(files[key], **kwargs) if nd is not None: if not isinstance(nd, list): nd = [nd] datasets.extend(nd) return datasets @_importer_method def _read_scp(*args, **kwargs): dataset, filename = args return dataset.load(filename, **kwargs) @_importer_method def _read_(*args, **kwargs): dataset, filename = args if kwargs.pop("remote", False): return Importer._read_remote(*args, **kwargs) if not filename or filename.is_dir(): return Importer._read_dir(*args, **kwargs) raise FileNotFoundError # protocol = kwargs.get("protocol", None) # if protocol and ".scp" in protocol: # return dataset.load(filename, **kwargs) # # elif filename and filename.name in ("fid", "ser", "1r", "2rr", "3rrr"): # # probably an Topspin NMR file # return dataset.read_topspin(filename, **kwargs) # elif filename: # # try scp format # try: # return dataset.load(filename, **kwargs) # except Exception: # # lets try some common format # for key in ["omnic", "opus", "topspin", "labspec", "matlab", "jdx"]: # try: # _read = getattr(dataset, f"read_{key}") # f = f"{filename}.{key}" # return _read(f, **kwargs) # except Exception: # pass # raise NotImplementedError # ====================================================================================== # Private functions # ====================================================================================== def _is_url(filename): return ( isinstance(filename, str) and re.match(r"http[s]?:[\/]{2}", filename) is not None ) def _openfid(filename, mode="rb", **kwargs): # Return a file ID # Check if Content has been passed? content = kwargs.get("content", False) # default encoding encoding = "utf-8" if _is_url(filename): # by default, we set the read_only flag to True when reading remote url kwargs["read_only"] = kwargs.get("read_only", True) # use request to read the remote content r = requests.get(filename, allow_redirects=True, timeout=10) r.raise_for_status() content = r.content encoding = r.encoding else: # Transform filename to a Path object is not yet the case filename = pathclean(filename) # Create the file ID if content: # if a content has been passed, then it has priority fid = ( io.BytesIO(content) if mode == "rb" else io.StringIO(content.decode(encoding)) ) else: fid = open(filename, mode=mode) # noqa: SIM115 return fid, kwargs def _write_downloaded_file(content, dst): if not dst.parent.exists(): # create the eventually missing subdirectory dst.parent.mkdir(parents=True, exist_ok=True) dst.write_bytes(content) info_(f"{dst.name} has been downloaded and written in {dst.parent}") def _get_url_content_and_save(url, dst, replace, read_only=False): if not replace and dst.exists(): return None try: r = requests.get(url, allow_redirects=True, timeout=10) r.raise_for_status() # write downloaded file if not read_only: _write_downloaded_file(r.content, dst) # in all case return the content return r.content except OSError: raise FileNotFoundError(f"Not found locally or at url: {url}") from None def _download_full_testdata_directory(): from spectrochempy.core import preferences as prefs datadir = prefs.datadir url = "https://github.com/spectrochempy/spectrochempy_data/archive/refs/heads/master.zip" resp = requests.get(url, timeout=10) zipfile = ZipFile(io.BytesIO(resp.content)) files = [zipfile.open(file_name) for file_name in zipfile.namelist()] for file in files: name = file.name if name.endswith("/") or "testdata/" not in name: # dir continue uncompressed = zipfile.read(name) p = list(pathclean(name).parts)[2:] dst = datadir.joinpath("/".join(p)) _write_downloaded_file(uncompressed, dst) def _download_from_github(path, dst, replace=False): # download on github (always save the downloaded files) relative_path = str(pathclean(path).as_posix()) path = ( f"https://github.com/spectrochempy/spectrochempy_data/raw/master/" f"testdata/{relative_path}" ) # first determine if it is a directory r = requests.get(path + "/__index__", allow_redirects=True, timeout=10) index = None if r.status_code == 200: index = yaml.safe_load(r.content) if index is None: return _get_url_content_and_save(path, dst, replace) # download folder for filename in index["files"]: _get_url_content_and_save(f"{path}/{filename}", dst / filename, replace) for folder in index["folders"]: _download_from_github(f"{relative_path}/{folder}", dst / folder) return None def _is_relative_to(path, base): # try to emulate the pathlib is_relative_to method which does not work on python # 3.7 (needed for Colab!) # TODO: check as Colab is updated to 3.10 pparts = path.parts bparts = base.parts if bparts[-1] in pparts: idx = pparts.index(bparts[-1]) pparts_base = pparts[: idx + 1] return pparts_base == bparts return False def _relative_to(path, base): pparts = path.parts bparts = base.parts if bparts[-1] in pparts: idx = pparts.index(bparts[-1]) return pathclean("/".join(pparts[idx + 1 :])) raise ValueError( f"'{path}' is not in the subpath of '{base}' OR one path is " f"relative and the other absolute.", ) @_importer_method def _read_remote(*args, **kwargs): from spectrochempy.core import preferences as prefs datadir = prefs.datadir dataset, path = args kwargs["merge"] = kwargs.get("merge", False) # by default, no attempt to merge read_method = kwargs.pop("read_method", read) download_only = kwargs.pop("download_only", False) replace = kwargs.pop( "replace_existing", False, ) # by default we download only if needed. # downloaded file # we try to download the github testdata path = pathclean(path) # we need to download additional files for topspin topspin = "topspin" in read_method.__name__ # we have to treat a special case: topspin, where the parent directory need # to be downloaded with the required file if topspin: savedpath = path m = re.match(r"(.*)(\/pdata\/\d+\/\d+[r|i]{1,2}|ser|fid)", str(path)) if m is not None: path = pathclean(m[1]) if _is_relative_to(path, datadir): # try to make it relative for remote downloading on github relative_path = _relative_to(path, datadir) else: # assume it is already relative relative_path = path # Try to download it dst = datadir / relative_path if dst.name == "testdata": # we are going to download the whole testdata directory # -> use a faster method _download_full_testdata_directory() return None content = _download_from_github(relative_path, dst, replace) if not download_only: if content is None: if topspin: return read_method( dataset, dst / _relative_to(savedpath, dst), **kwargs, ) return read_method(dataset, dst, **kwargs) return read_method(dataset, dst, content=content, **kwargs) return None