Source code for spectrochempy.utils.file

# ======================================================================================
# Copyright (©) 2015-2025 LCS - Laboratoire Catalyse et Spectrochimie, Caen, France.
# CeCILL-B FREE SOFTWARE LICENSE AGREEMENT
# See full LICENSE agreement in the root directory.
# ======================================================================================
"""File utilities."""

import importlib.util
import re
import struct
import warnings
from os import environ
from pathlib import Path
from pathlib import PosixPath
from pathlib import WindowsPath

import numpy as np


# ======================================================================================
# Utility functions
# ======================================================================================
def download_testdata():
    from spectrochempy.core import preferences
    from spectrochempy.core.readers.importer import read
    from spectrochempy.utils.file import pathclean

    datadir = pathclean(preferences.datadir)
    # this process is relatively long, so we do not want to do it several time:
    downloaded = datadir / "__downloaded__"
    if not downloaded.exists():
        read(datadir, download_only=True)
        downloaded.touch(exist_ok=True)


def is_editable_install(package_name):
    """
    Check if a package is installed in editable mode.

    Parameters
    ----------
    package_name : str
        The name of the package to check.

    Returns
    -------
    bool
        True if the package is installed in editable mode, False otherwise.
    """
    spec = importlib.util.find_spec(package_name)
    if spec is None:
        return False
    print("origin", spec.origin)  # noqa: T201
    return f"{package_name}/src" in spec.origin


def get_repo_path():
    """
    Get the repository path based on the installation mode.

    Returns
    -------
    Path
        The path to the repository.
    """
    if is_editable_install("spectrochempy"):
        return Path(__file__).parent.parent.parent.parent
    return Path(__file__).parent.parent


def fromfile(fid, dtype, count):
    # to replace np.fromfile in case of io.BytesIO object instead of byte
    # object
    t = {
        "uint8": "B",
        "int8": "b",
        "uint16": "H",
        "int16": "h",
        "uint32": "I",
        "int32": "i",
        "float32": "f",
        "char8": "c",
    }
    typ = t[dtype] * count
    if dtype.endswith("16"):
        count *= 2
    elif dtype.endswith("32"):
        count *= 4

    out = struct.unpack(typ, fid.read(count))
    if len(out) == 1:
        return out[0]
    return np.array(out)


def _insensitive_case_glob(pattern):
    def either(c):
        return f"[{c.lower()}{c.upper()}]" if c.isalpha() else c

    return "".join(map(either, pattern))


def patterns(filetypes, allcase=True):
    regex = r"\*\.*\[*[0-9-]*\]*\w*\**"
    patterns = []
    if not isinstance(filetypes, (list, tuple)):  # noqa: UP038
        filetypes = [filetypes]
    for ft in filetypes:
        m = re.finditer(regex, ft)
        patterns.extend([match.group(0) for match in m])
    if not allcase:
        return patterns
    return [_insensitive_case_glob(p) for p in patterns]


[docs] def pathclean(paths): """ Clean a path or a series of path. The aim is to be compatible with windows and unix-based system. Parameters ---------- paths : `str` or a `list` of `str` Path to clean. It may contain Windows or conventional python separators. Returns ------- pathlib or list of pathlib Cleaned path(s). """ import platform def is_windows(): return "Windows" in platform.platform() def _clean(path): if isinstance(path, (Path, PosixPath, WindowsPath)): # noqa: UP038 (syntax error in pyfakefs with modern union operators) path = path.name if is_windows(): path = WindowsPath(path) # pragma: no cover else: # some replacement so we can handle window style path on unix path = path.strip() path = path.replace("\\", "/") path = path.replace("\n", "/n") path = path.replace("\t", "/t") path = path.replace("\b", "/b") path = path.replace("\a", "/a") path = PosixPath(path) return Path(path) if paths is not None: if isinstance(paths, (str, Path, PosixPath, WindowsPath)): # noqa: UP038 path = str(paths) return _clean(path).expanduser() if isinstance(paths, (list, tuple)): # noqa: UP038 return [_clean(p).expanduser() if isinstance(p, str) else p for p in paths] return paths
def _get_file_for_protocol(f, **kwargs): protocol = kwargs.get("protocol") if protocol is not None: if isinstance(protocol, str): if protocol in ["ALL"]: protocol = "*" if protocol in ["opus"]: protocol = "*.0*" protocol = [protocol] lst = [] for p in protocol: lst.extend(list(f.parent.glob(f"{f.stem}.{p}"))) if not lst: return None return f.parent / lst[0] return None def check_filenames(*args, **kwargs): """ Return a list or a dictionary of filenames. Parameters ---------- *args If passed it is a str, a list of str or a dictionary containing filenames or a byte's contents. **kwargs Optional keywords parameters. See Other parameters Other Parameters ---------------- filename : filetypes : content : protocol : processed : expno : procno : iterdir : glob : See Also -------- check_filename_to_open check_filename_to_save """ # from spectrochempy.application import info_ from spectrochempy.core import preferences as prefs datadir = pathclean(prefs.datadir) filenames = None if args: if ( isinstance(args[0], str) and (args[0].startswith("http://") or args[0].startswith("https://")) # and kwargs.get("remote") ): # return url return args if isinstance(args[0], (str, Path, PosixPath, WindowsPath)): # noqa: UP038 # one or several filenames are passed - make Path objects filenames = pathclean(args) elif isinstance(args[0], bytes): # in this case, one or several byte contents has been passed instead of filenames # as filename where not given we passed the 'unnamed' string # return a dictionary return {pathclean(f"no_name_{i}"): arg for i, arg in enumerate(args)} elif isinstance(args[0], list) and isinstance( # noqa: UP038 args[0][0], (str, Path, PosixPath, WindowsPath) ): filenames = pathclean(args[0]) elif isinstance(args[0], list) and isinstance(args[0][0], bytes): return {pathclean(f"no_name_{i}"): arg for i, arg in enumerate(args[0])} elif isinstance(args[0], dict): # return directly the dictionary return args[0] if not filenames: # look into keywords (only the case where a str or pathlib filename is given are # accepted) filenames = kwargs.pop("filename", None) filenames = [pathclean(filenames)] if pathclean(filenames) is not None else None # Look for content in kwargs content = kwargs.pop("content", None) if content: if not filenames: filenames = [pathclean("no_name")] return {filenames[0]: content} if not filenames: # no filename specified open a dialog filetypes = kwargs.pop("filetypes", ["all files (*)"]) directory = pathclean(kwargs.pop("directory", None)) filenames = get_filenames( directory=directory, dictionary=True, filetypes=filetypes, **kwargs, ) if filenames and not isinstance(filenames, dict): filenames_ = [] for filename in filenames: # in which directory ? directory = filename.parent if directory.resolve() == Path.cwd() or directory == Path(): directory = "" kw_directory = pathclean(kwargs.get("directory")) if directory and kw_directory and directory != kw_directory: # conflict we do not take into account the kw. warnings.warn( "Two different directory where specified (from args and keywords arg). " "Keyword `directory` will be ignored!", stacklevel=2, ) elif not directory and kw_directory: filename = pathclean(kw_directory / filename) # check if the file exists here if not directory or str(directory).startswith("."): # search first in the current directory directory = Path.cwd() f = pathclean(directory / filename) fexist = f if f.exists() else _get_file_for_protocol(f, **kwargs) # info_(f"fexist {fexist}") if fexist is None: f = pathclean(datadir / filename) # info_(f"f (line 255) {f}") fexist = f if f.exists() else _get_file_for_protocol(f, **kwargs) # info_(f"fexist {fexist}") if fexist: filename = fexist # Particular case for topspin where filename can be provided # as a directory only if filename.is_dir() and "topspin" in kwargs.get("protocol", []): filename = _topspin_check_filename(filename, **kwargs) if not isinstance(filename, list): filename = [filename] filenames_.extend(filename) filenames = filenames_ return filenames def _topspin_check_filename(filename, **kwargs): if kwargs.get("iterdir", False) or kwargs.get("glob") is not None: # when we list topspin dataset we have to read directories, not directly files # we can retrieve them using glob patterns glob = kwargs.get("glob") if glob: files_ = list(filename.glob(glob)) elif not kwargs.get("processed", False): files_ = list(filename.glob("**/ser")) files_.extend(list(filename.glob("**/fid"))) else: files_ = list(filename.glob("**/1r")) files_.extend(list(filename.glob("**/2rr"))) files_.extend(list(filename.glob("**/3rrr"))) else: expno = kwargs.pop("expno", None) procno = kwargs.pop("procno", None) if expno is None: expnos = sorted(filename.glob("[0-9]*")) expno = expnos[0] if expnos else expno # read a fid or a ser if procno is None: f = filename / str(expno) files_ = [f / "ser"] if (f / "ser").exists() else [f / "fid"] else: # get the adsorption spectrum f = filename / str(expno) / "pdata" / str(procno) if (f / "3rrr").exists(): files_ = [f / "3rrr"] elif (f / "2rr").exists(): files_ = [f / "2rr"] else: files_ = [f / "1r"] # depending on the glob patterns too many files may have been selected : restriction to the valid subset filename = [] for item in files_: if item.name in ["fid", "ser", "1r", "2rr", "3rrr"]: filename.append(item) return filename def get_filenames(*filenames, **kwargs): """ Return a list or dictionary of the filenames of existing files, filtered by extensions. Parameters ---------- filenames : `str` or pathlib object, `tuple` or `list` of strings of pathlib object, optional. A filename or a list of filenames. If not provided, a dialog box is opened to select files in the current directory if no `directory` is specified). **kwargs Other optional keyword parameters. See Other Parameters. Returns ------- out List of filenames. Other Parameters ---------------- directory : `str` or pathlib object, optional. The directory where to look at. If not specified, read in current directory, or in the datadir if unsuccessful. filetypes : `list` , optional, default=['all files, '.*)']. File type filter. dictionary : `bool` , optional, default=True Whether a dictionary or a list should be returned. iterdir : bool, default=False Read all file (possibly limited by `filetypes` in a given `directory` . recursive : bool, optional, default=False. Read also subfolders. Warnings -------- if several filenames are provided in the arguments, they must all reside in the same directory! """ from spectrochempy import NO_DIALOG from spectrochempy.core import preferences as prefs NODIAL = ( (NO_DIALOG or "DOC_BUILDING" in environ) and "KEEP_DIALOGS" not in environ ) # flag to suppress dialog when doc is built or during full testing # allowed filetypes # ----------------- # alias filetypes and filters as both can be used filetypes = kwargs.get("filetypes", kwargs.get("filters", ["all files (*)"])) # filenames # --------- if len(filenames) == 1 and isinstance(filenames[0], (list, tuple)): # noqa: UP038 filenames = filenames[0] filenames = pathclean(list(filenames)) directory = None if len(filenames) == 1: # check if it is a directory try: f = get_directory_name(filenames[0]) except OSError: f = None if f and f.is_dir(): # this specify a directory not a filename directory = f filenames = None NODIAL = True # else: # filenames = pathclean(list(filenames)) # directory # --------- kw_dir = pathclean(kwargs.pop("directory", None)) if directory is None: directory = kw_dir if directory is not None: if filenames: # prepend to the filename (incompatibility between filename and directory specification # will result to a error filenames = [pathclean(directory / filename) for filename in filenames] else: directory = get_directory_name(directory) # check the parent directory # all filenames must reside in the same directory if filenames: parents = set() for f in filenames: parents.add(f.parent) if len(parents) > 1: raise ValueError( "filenames provided have not the same parent directory. " "This is not accepted by the readfilename function.", ) # use get_directory_name to complete eventual missing part of the absolute path directory = get_directory_name(parents.pop()) filenames = [filename.name for filename in filenames] # now proceed with the filenames if filenames: # look if all the filename exists either in the specified directory, # else in the current directory, and finally in the default preference data directory temp = [] for _i, filename in enumerate(filenames): if not (pathclean(directory / filename)).exists(): # the filename provided doesn't exists in the working directory # try in the data directory directory = pathclean(prefs.datadir) if not (pathclean(directory / filename)).exists(): raise OSError(f"Can't find this filename {filename}") temp.append(directory / filename) # now we have checked all the filename with their correct location filenames = temp else: # no filenames: # open a file dialog # except if a directory is specified or iterdir is True. getdir = kwargs.get( "iterdir", directory is not None or kwargs.get("protocol") == ["topspin"], # or kwargs.get("protocol", None) == ["carroucell"], ) if not getdir: # we open a dialogue to select one or several files manually if not NODIAL: from spectrochempy.core.common.dialogs import open_dialog filenames = open_dialog( single=False, directory=directory, filters=filetypes, **kwargs, ) if not filenames: # cancel return None elif environ.get("TEST_FILE", None) is not None: # happen for testing filenames = [pathclean(environ.get("TEST_FILE"))] else: if not NODIAL: from spectrochempy.core.common.dialogs import open_dialog directory = open_dialog( directory=directory, filters="directory", **kwargs, ) if not directory: # cancel return None elif NODIAL and not directory: directory = get_directory_name(environ.get("TEST_FOLDER")) elif NODIAL and kwargs.get("protocol") == ["topspin"]: directory = get_directory_name(environ.get("TEST_NMR_FOLDER")) if directory is None: return None filenames = [] if kwargs.get("protocol") != ["topspin"]: # automatic reading of the whole directory for pat in patterns(filetypes): if kwargs.get("recursive", False): pat = f"**/{pat}" filenames.extend(list(directory.glob(pat))) else: # Topspin directory detection filenames = [directory] # on mac case insensitive OS this cause doubling the number of files. # Eliminates doublons: filenames = list(set(filenames)) filenames = pathclean(filenames) if not filenames: # problem with reading? return None # now we have either a list of the selected files if isinstance(filenames, list) and not all( isinstance(elem, (Path, PosixPath, WindowsPath)) # noqa: UP038 for elem in filenames # noqa: UP038 ): raise OSError("one of the list elements is not a filename!") # or a single filename if isinstance(filenames, (str, Path, PosixPath, WindowsPath)): # noqa: UP038 filenames = [filenames] filenames = pathclean(filenames) for filename in filenames[:]: if filename.name.endswith(".DS_Store"): # sometime present in the directory (MacOSX) filenames.remove(filename) dictionary = kwargs.get("dictionary", True) protocol = kwargs.get("protocol") if dictionary and protocol != ["topspin"]: # make and return a dictionary filenames_dict = {} for filename in filenames: if filename.is_dir() and protocol != ["carroucell"]: continue extension = filename.suffix.lower() if not extension: if re.match(r"^fid$|^ser$|^[1-3][ri]*$", filename.name) is not None: extension = ".topspin" elif extension[1:].isdigit(): # probably an opus file extension = ".opus" if extension in filenames_dict: filenames_dict[extension].append(filename) else: filenames_dict[extension] = [filename] return filenames_dict return filenames def find_or_create_spectrochempy_dir(): directory = Path.home() / ".spectrochempy" directory.mkdir(exist_ok=True) # Create directory only if it does not exist if directory.is_file(): # pragma: no cover msg = "Intended SpectroChemPy directory `{0}` is actually a file." raise OSError(msg.format(directory)) return directory def get_directory_name(directory, **kwargs): """ Return a valid directory name. Parameters ---------- directory : `str` or `pathlib.Path` object, optional. A directory name. If not provided, a dialog box is opened to select a directory. Returns ------- out: `pathlib.Path` object valid directory name. """ from spectrochempy import NO_DIALOG from spectrochempy.core import preferences as prefs data_dir = pathclean(prefs.datadir) working_dir = Path.cwd() directory = pathclean(directory) if directory: # Search locally if directory.is_dir(): # nothing else to do return directory if (working_dir / directory).is_dir(): # if no parent directory: look at current working dir return working_dir / directory if (data_dir / directory).is_dir(): return data_dir / directory raise OSError(f'"{directory!s}" is not a valid directory') # warnings.warn(f'"{directory}" is not a valid directory') # return None # open a file dialog directory = data_dir if not NO_DIALOG: # this is for allowing test to continue in the background from spectrochempy.core.common.dialogs import open_dialog directory = open_dialog( single=False, directory=working_dir, filters="directory", **kwargs, ) return pathclean(directory) def check_filename_to_save( dataset, filename=None, save_as=False, confirm=True, **kwargs, ): from spectrochempy import NO_DIALOG from spectrochempy.application import info_ NODIAL = (NO_DIALOG or "DOC_BUILDING" in environ) and "KEEP_DIALOGS" not in environ filename = pathclean(filename) if filename and pathclean(filename).parent.resolve() == Path.cwd(): filename = Path.cwd() / filename if not filename or save_as or filename.exists(): from spectrochempy.core.common.dialogs import save_dialog # no filename provided open_diag = True caption = "Save as ..." if filename is None or (NODIAL and pathclean(filename).is_dir()): filename = dataset.name filename = filename + kwargs.get("suffix", ".scp") # existing filename provided elif filename.exists(): if confirm: caption = "File exists. Confirm overwrite" else: info_(f"A file {filename} was present and has been overwritten.") open_diag = False if not NODIAL and open_diag: filename = save_dialog( caption=kwargs.pop("caption", caption), filename=filename, filters=kwargs.pop("filetypes", ["All file types (*.*)"]), **kwargs, ) if filename is None: # this is probably due to a cancel action for an open dialog. return None return pathclean(filename) def check_filename_to_open(*args, **kwargs): # Check the args and keywords arg to determine the correct filename filenames = check_filenames(*args, **kwargs) if filenames is None: # not args and # this is probably due to a cancel action for an open dialog. return None if not isinstance(filenames, dict): if len(filenames) == 1 and filenames[0] is None: raise (FileNotFoundError) # deal with some specific cases if isinstance(filenames[0], Path): # all filename should be Path, except case of urls key = filenames[0].suffix.lower() elif filenames[0].startswith("http://") or filenames[0].startswith("https://"): key = pathclean(filenames[0]).suffix.lower() if ( not key and re.match(r"^fid$|^ser$|^[1-3][ri]*$", filenames[0].name) is not None ): key = ".topspin" if key[1:].isdigit(): # probably an opus file key = ".opus" return {key: filenames} if len(args) > 0 and args[0] is not None: # args where passed so in this case we have directly byte contents instead of filenames only contents = filenames return {"frombytes": contents} # probably no args (which means that we are coming from a dialog or from a full list of a directory return filenames