Source code for spectrochempy.core.readers.read_omnic

# ======================================================================================
# Copyright (©) 2015-2025 LCS - Laboratoire Catalyse et Spectrochimie, Caen, France.
# CeCILL-B FREE SOFTWARE LICENSE AGREEMENT
# See full LICENSE agreement in the root directory.
# ======================================================================================
"""
Extend NDDataset with the import method for OMNIC generated data files.

This module provides functions to read OMNIC generated data files.
"""

__all__ = ["read_omnic", "read_spg", "read_spa", "read_srs"]
__dataset_methods__ = __all__

import io
import re
from datetime import datetime
from datetime import timedelta

import numpy as np

from spectrochempy.application import info_
from spectrochempy.core.dataset.coord import Coord
from spectrochempy.core.dataset.nddataset import NDDataset
from spectrochempy.core.readers.importer import Importer
from spectrochempy.core.readers.importer import _importer_method
from spectrochempy.core.readers.importer import _openfid
from spectrochempy.core.units import ur
from spectrochempy.utils.datetimeutils import UTC
from spectrochempy.utils.datetimeutils import utcnow
from spectrochempy.utils.docreps import _docstring
from spectrochempy.utils.file import fromfile

# ======================================================================================
# Public functions
# ======================================================================================
_docstring.delete_params("Importer.see_also", "read_omnic")


[docs] @_docstring.dedent def read_omnic(*paths, **kwargs): r""" Open a Thermo Nicolet OMNIC file. Open Omnic file or a list of :file:`.spg`, :file:`.spa` or :file:`.srs` files and set data/metadata in the current dataset. The collected metadata are: - names of spectra - acquisition dates (UTC) - units of spectra (absorbance, transmittance, reflectance, Log(1/R), Kubelka-Munk, Raman intensity, photoacoustics, volts) - units of xaxis (wavenumbers in :math:`cm^{-1}`, wavelengths in nm or micrometer, Raman shift in :math:`cm^{-1}`) - spectra history (but only incorporated in the NDDataset if a single spa is read) An error is generated if attempt is made to inconsistent datasets: units of spectra and xaxis, limits and number of points of the xaxis. Parameters ---------- %(Importer.parameters)s Returns ------- %(Importer.returns)s Other Parameters ---------------- %(Importer.other_parameters)s See Also -------- read_spg : Read grouped Omnic spectra. read_spa : Read single Omnic spectra. read_srs : Read series Omnic spectra. %(Importer.see_also.no_read_omnic)s Examples -------- Reading a single OMNIC file (providing a windows type filename relative to the default `datadir` ) >>> scp.read_omnic('irdata\\nh4y-activation.spg') NDDataset: [float64] a.u. (shape: (y:55, x:5549)) Reading a single OMNIC file (providing a unix/python type filename relative to the default `datadir` ) Note that here read_omnic is called as a classmethod of the NDDataset class >>> scp.read_omnic('irdata/nh4y-activation.spg') NDDataset: [float64] a.u. (shape: (y:55, x:5549)) Single file specified with pathlib.Path object >>> from pathlib import Path >>> folder = Path('irdata') >>> p = folder / 'nh4y-activation.spg' >>> scp.read_omnic(p) NDDataset: [float64] a.u. (shape: (y:55, x:5549)) The directory can also be specified independently, either as a string or a pathlib object >>> scp.read_omnic('nh4y-activation.spg', directory=folder) NDDataset: [float64] a.u. (shape: (y:55, x:5549)) Multiple files not merged (return a list of datasets) >>> le = scp.read_omnic('irdata/nh4y-activation.spg', 'wodger.spg') >>> len(le) 2 >>> le[1] NDDataset: [float64] a.u. (shape: (y:55, x:5549)) Multiple files merged as the `merge` keyword is set to true >>> scp.read_omnic('irdata/nh4y-activation.spg', 'wodger.spg', merge=True) NDDataset: [float64] a.u. (shape: (y:57, x:5549)) Multiple files to merge : they are passed as a list (note the brakets) instead of using the keyword `merge` >>> scp.read_omnic(['irdata/nh4y-activation.spg', 'wodger.spg']) NDDataset: [float64] a.u. (shape: (y:57, x:5549)) Multiple files not merged : they are passed as a list but `merge` is set to false >>> l2 = scp.read_omnic(['irdata/nh4y-activation.spg', 'wodger.spg'], merge=False) >>> len(l2) 2 Read without a filename. This has the effect of opening a dialog for file(s) selection >>> nd = scp.read_omnic() Read in a directory (assume that only OPUS files are present in the directory (else we must use the generic `read` function instead) >>> l3 = scp.read_omnic(directory='irdata/subdir/1-20') >>> len(l3) 3 Again we can use merge to stack all 4 spectra if thet have compatible dimensions. >>> scp.read_omnic(directory='irdata/subdir', merge=True) NDDataset: [float64] a.u. (shape: (y:4, x:5549)) An example, where bytes contents are passed directly to the read_omnic method. >>> datadir = scp.preferences.datadir >>> filename1 = datadir / 'irdata' / 'subdir' / '7_CZ0-100 Pd_101.SPA' >>> content1 = filename1.read_bytes() >>> filename2 = datadir / 'wodger.spg' >>> content2 = filename2.read_bytes() >>> listnd = scp.read_omnic({filename1.name: content1, filename2.name: content2}) >>> len(listnd) 2 >>> scp.read_omnic({filename1.name: content1, filename2.name: content2}, merge=True) NDDataset: [float64] a.u. (shape: (y:3, x:5549)) """ kwargs["filetypes"] = ["OMNIC files (*.spa *.spg)", "OMNIC series (*.srs)"] kwargs["protocol"] = ["omnic", "spa", "spg", "srs"] importer = Importer() return importer(*paths, **kwargs)
[docs] @_docstring.dedent def read_spg(*paths, **kwargs): r""" Open a Thermo Nicolet file or a list of files with extension ``.spg``. Parameters ---------- %(Importer.parameters)s Returns ------- %(Importer.returns)s Other Parameters ---------------- %(Importer.other_parameters)s See Also -------- read_spg : Read grouped Omnic spectra. read_spa : Read single Omnic spectra. read_srs : Read series Omnic spectra. %(Importer.see_also.no_read_omnic)s Notes ----- This method is an alias of `read_omnic`, except that the type of file is constrained to ``.spg``. Examples -------- >>> scp.read_spg('irdata/nh4y-activation.spg') NDDataset: [float64] a.u. (shape: (y:55, x:5549)) """ kwargs["filetypes"] = ["OMNIC files (*.spg)"] kwargs["protocol"] = ["spg", "spa"] importer = Importer() return importer(*paths, **kwargs)
[docs] @_docstring.dedent def read_spa(*paths, **kwargs): r""" Open a Thermo Nicolet file or a list of files with extension ``.spa``. Parameters ---------- %(Importer.parameters)s Returns ------- %(Importer.returns)s Other Parameters ---------------- %(Importer.other_parameters)s See Also -------- %(Importer.see_also)s Notes ----- This method is an alias of `read_omnic`, except that the type of file is contrain to ``.spa``. Examples -------- >>> scp.read_spa('irdata/subdir/20-50/7_CZ0-100 Pd_21.SPA') NDDataset: [float64] a.u. (shape: (y:1, x:5549)) >>> scp.read_spa(directory='irdata/subdir', merge=True) NDDataset: [float64] a.u. (shape: (y:4, x:5549)) """ kwargs["filetypes"] = ["OMNIC files (*.spa)"] kwargs["protocol"] = ["spa"] importer = Importer() return importer(*paths, **kwargs)
[docs] @_docstring.dedent def read_srs(*paths, **kwargs): r""" Open a Thermo Nicolet file or a list of files with extension ``.srs``. Parameters ---------- %(Importer.parameters)s Returns ------- %(Importer.returns)s When return_bg is set to 'True', the series background is returned. Other Parameters ---------------- return_bg : bool, optional Default value is False. When set to 'True' returns the series background %(Importer.other_parameters)s See Also -------- %(Importer.see_also)s Notes ----- This method is an alias of `read_omnic`, except that the type of file is constrained to ``.srs``. Examples -------- >>> scp.read_srs('irdata/omnic_series/rapid_scan_reprocessed.srs') NDDataset: [float64] a.u. (shape: (y:643, x:3734)) """ kwargs["filetypes"] = ["OMNIC series (*.srs)"] kwargs["protocol"] = ["srs"] importer = Importer() return importer(*paths, **kwargs)
# ====================================================================================== # Private functions # ====================================================================================== @_importer_method def _read_spg(*args, **kwargs): # read spg file dataset, filename = args fid, kwargs = _openfid(filename, **kwargs) # Read name: # The name starts at position hex 1e = decimal 30. Its max length # is 256 bytes. It is the original filename under which the group has been saved: it # won't match with the actual filename if a subsequent renaming has been done in the # OS. spg_title = _readbtext(fid, 30, 256) # Count the number of spectra # From hex 120 = decimal 304, individual spectra are described # by blocks of lines starting with "key values", # for instance hex[02 6a 6b 69 1b 03 82] -> dec[02 106 107 105 27 03 130] # Each of these lines provides positions of data and metadata in the file: # # key: hex 02, dec 02: position of spectral header (=> nx, firstx, # lastx, nscans, nbkgscans) # key: hex 03, dec 03: intensity position # key: hex 04, dec 04: user text position # key: hex 1B, dec 27: position of History text # key: hex 64, dec 100: ? # key: hex 66 dec 102: sample interferogram # key: hex 67 dec 103: background interferogram # key: hex 69, dec 105: ? # key: hex 6a, dec 106: ? # key: hex 6b, dec 107: position of spectrum title, the acquisition # date follows at +256(dec) # key: hex 80, dec 128: ? # key: hex 82, dec 130: rotation angle ? # # the number of line per block may change from file to file but the total # number of lines is given at hex 294, hence allowing counting the # number of spectra: # read total number of lines fid.seek(294) nlines = fromfile(fid, "uint16", count=1) # read "key values" pos = 304 keys = np.zeros(nlines) for i in range(nlines): fid.seek(pos) keys[i] = fromfile(fid, dtype="uint8", count=1) pos += 16 # the number of occurrences of the key '02' is number of spectra nspec = np.count_nonzero(keys == 2) if nspec == 0: # pragma: no cover raise OSError( "Error : File format not recognized - information markers not found", ) # container to hold values nx, firstx, lastx = ( np.zeros(nspec, "int"), np.zeros(nspec, "float"), np.zeros(nspec, "float"), ) xunits = [] xtitles = [] units = [] titles = [] # Extracts positions of '02' keys key_is_02 = keys == 2 # ex: [T F F F F T F (...) F T ....]' indices02 = np.nonzero(key_is_02) # ex: [1 9 ...] position02 = ( 304 * np.ones(len(indices02[0]), dtype="int") + 16 * indices02[0] ) # ex: [304 432 ...] for i in range(nspec): # read the position of the header fid.seek(position02[i] + 2) pos_header = fromfile(fid, dtype="uint32", count=1) # get infos info = _read_header(fid, pos_header) nx[i] = info["nx"] firstx[i] = info["firstx"] lastx[i] = info["lastx"] xunits.append(info["xunits"]) xtitles.append(info["xtitle"]) units.append(info["units"]) titles.append(info["title"]) # check the consistency of xaxis and data units if np.ptp(nx) != 0: # pragma: no cover raise ValueError( "Error : Inconsistent data set -" " number of wavenumber per spectrum should be " "identical", ) if np.ptp(firstx) != 0: # pragma: no cover raise ValueError( "Error : Inconsistent data set - the x axis should start at same value", ) if np.ptp(lastx) != 0: # pragma: no cover raise ValueError( "Error : Inconsistent data set - the x axis should end at same value", ) if len(set(xunits)) != 1: # pragma: no cover raise ValueError( "Error : Inconsistent data set - data units should be identical", ) if len(set(units)) != 1: # pragma: no cover raise ValueError( "Error : Inconsistent data set - x axis units should be identical", ) data = np.ndarray((nspec, nx[0]), dtype="float32") # Now the intensity data # Extracts positions of '03' keys key_is_03 = keys == 3 indices03 = np.nonzero(key_is_03) position03 = 304 * np.ones(len(indices03[0]), dtype="int") + 16 * indices03[0] # Read number of spectral intensities for i in range(nspec): data[i, :] = _getintensities(fid, position03[i]) # Get spectra titles & acquisition dates: # container to hold values spectitles, acquisitiondates, timestamps = [], [], [] # Extract positions of '6B' keys (spectra titles & acquisition dates) key_is_6B = keys == 107 indices6B = np.nonzero(key_is_6B) position6B = 304 * np.ones(len(indices6B[0]), dtype="int") + 16 * indices6B[0] # Read spectra titles and acquisition date for i in range(nspec): # determines the position of informatioon fid.seek(position6B[i] + 2) # go to line and skip 2 bytes spa_title_pos = fromfile(fid, "uint32", 1) # read omnic filename spa_title = _readbtext(fid, spa_title_pos, 256) spectitles.append(spa_title) # and the acquisition date fid.seek(spa_title_pos + 256) timestamp = fromfile(fid, dtype="uint32", count=1) # since 31/12/1899, 00:00 acqdate = datetime(1899, 12, 31, 0, 0, tzinfo=UTC) + timedelta( seconds=int(timestamp), ) acquisitiondates.append(acqdate) timestamp = acqdate.timestamp() # Transform back to timestamp for storage in the Coord object # use datetime.fromtimestamp(d, timezone.utc)) # to transform back to datetime object timestamps.append(timestamp) # Not used at present # ------------------- # extract positions of '1B' codes (history text), sometimes absent, # e.g. peakresolve) # key_is_1B = (keys == 27) # indices1B = # np.nonzero(key_is_1B) # position1B = 304 * np.ones(len(indices1B[0]), dtype='int') + 16 * indices6B[0] # if len(position1B) != 0: # read history texts # for j in range(nspec): determine the position of information # f.seek(position1B[j] + 2) # # history_pos = fromfile(f, 'uint32', 1) # history = _readbtext(f, history_pos[0]) # allhistories.append(history) fid.close() # Create Dataset Object of spectral content dataset.data = data dataset.units = units[0] dataset.title = titles[0] dataset.name = filename.stem dataset.filename = filename # now add coordinates _x = Coord.linspace( firstx[0], lastx[0], nx[0], title=xtitles[0], units=xunits[0], ) _y = Coord( timestamps, title="acquisition timestamp (GMT)", units="s", labels=(acquisitiondates, spectitles), ) dataset.set_coordset(y=_y, x=_x) # Set description, date and history # Omnic spg file don't have specific "origin" field stating the oirigin of the data dataset.description = kwargs.get( "description", f"Omnic title: {spg_title}\nOmnic filename: {filename}", ) dataset._date = utcnow() dataset.history = f"Imported from spg file {filename}." if kwargs.pop("sortbydate", True): dataset.sort(dim="y", inplace=True) dataset.history = "Sorted by date" # debug_("end of reading") return dataset @_importer_method def _read_spa(*args, **kwargs): dataset, filename = args fid, kwargs = _openfid(filename, **kwargs) return_ifg = kwargs.get("return_ifg", None) # Read name: # The name starts at position hex 1e = decimal 30. Its max length # is 256 bytes. It is the original filename under which the spectrum has # been saved: it won't match with the actual filename if a subsequent # renaming has been done in the OS. spa_name = _readbtext(fid, 30, 256) # The acquisition date (GMT) is at hex 128 = decimal 296. # Second since 31/12/1899, 00:00 fid.seek(296) timestamp = fromfile(fid, dtype="uint32", count=1) acqdate = datetime(1899, 12, 31, 0, 0, tzinfo=UTC) + timedelta( seconds=int(timestamp), ) acquisitiondate = acqdate # Transform back to timestamp for storage in the Coord object # use datetime.fromtimestamp(d, timezone.utc)) to transform back to datetime object timestamp = acqdate.timestamp() # From hex 120 = decimal 304, the spectrum is described # by a block of lines starting with "key values", # for instance hex[02 6a 6b 69 1b 03 82] -> dec[02 106 107 105 27 03 130] # Each of these lines provides positions of data and metadata in the file: # # key: hex 02, dec 02: position of spectral header (=> nx, # firstx, lastx, nscans, nbkgscans) # key: hex 03, dec 03: intensity position # # key: hex 04, dec 04: user text position (custom info, can be present # several times. The text length is five bytes later) # key: hex 1B, dec 27: position of History text, The text length # is five bytes later # key: hex 53, dec 83: probably not a position, present when 'Retrieved from library' # key: hex 64, dec 100: ? # key: hex 66 dec 102: sample interferogram # key: hex 67 dec 103: background interferogram # key: hex 69, dec 105: ? # key: hex 6a, dec 106: ? # key: hex 80, dec 128: ? # key: hex 82, dec 130: position of 'Experiment Information', The text length # is five bytes later. The block gives Experiment filename (at +10) # Experiment title (+90), custom text (+254), accessory name (+413) # key: hex 92, dec 146: position of 'custom infos', The text length # is five bytes later. # # The line preceding the block start with '01' or '0A' # The lines after the block generally start with '00', except in few cases where # they start by '01'. In such cases, the '53' key is also present # (before the '1B'). # scan "key values" pos = 304 spa_comments = [] # several custom comments can be present while "continue": fid.seek(pos) key = fromfile(fid, dtype="uint8", count=1) # print(key, end=' ; ') if key == 2: # read the position of the header fid.seek(pos + 2) pos_header = fromfile(fid, dtype="uint32", count=1) info = _read_header(fid, pos_header) elif key == 3 and return_ifg is None: intensities = _getintensities(fid, pos) elif key == 4: fid.seek(pos + 2) comments_pos = fromfile(fid, "uint32", 1) fid.seek(pos + 6) comments_len = fromfile(fid, "uint32", 1) fid.seek(comments_pos) spa_comments.append(fid.read(comments_len).decode("latin-1", "replace")) elif key == 27: fid.seek(pos + 2) history_pos = fromfile(fid, "uint32", 1) fid.seek(pos + 6) history_len = fromfile(fid, "uint32", 1) spa_history = _readbtext(fid, history_pos, history_len) elif key == 102 and return_ifg == "sample": s_ifg_intensities = _getintensities(fid, pos) elif key == 103 and return_ifg == "background": b_ifg_intensities = _getintensities(fid, pos) elif key == 00 or key == 1: break pos += 16 fid.close() if (return_ifg == "sample" and "s_ifg_intensities" not in locals()) or ( return_ifg == "background" and "b_ifg_intensities" not in locals() ): info_("No interferogram found, read_spa returns None") return None if return_ifg == "sample": intensities = s_ifg_intensities elif return_ifg == "background": intensities = b_ifg_intensities # load intensity into the NDDataset dataset.data = np.array(intensities[np.newaxis], dtype="float32") if return_ifg == "background": title = "sample acquisition timestamp (GMT)" # bckg acquisition date is not known for the moment... else: title = "acquisition timestamp (GMT)" # no ambiguity here _y = Coord( [timestamp], title=title, units="s", labels=([acquisitiondate], [filename]), ) # useful when a part of the spectrum/ifg has been blanked: dataset.mask = np.isnan(dataset.data) if return_ifg is None: default_description = f"# Omnic name: {spa_name}\n# Filename: {filename.name}" dataset.units = info["units"] dataset.title = info["title"] # now add coordinates nx = info["nx"] firstx = info["firstx"] lastx = info["lastx"] xunit = info["xunits"] xtitle = info["xtitle"] _x = Coord.linspace( firstx, lastx, int(nx), title=xtitle, units=xunit, ) else: # interferogram if return_ifg == "sample": default_description = ( f"# Omnic name: {spa_name} : sample IFG\n # Filename: {filename.name}" ) else: default_description = f"# Omnic name: {spa_name} : background IFG\n # Filename: {filename.name}" spa_name += ": Sample IFG" dataset.units = "V" dataset.title = "detector signal" _x = Coord.arange( len(intensities), title="data points", units=None, ) dataset.set_coordset(y=_y, x=_x) dataset.name = spa_name # to be consistent with omnic behaviour dataset.filename = filename # Set origin, description, history, date # Omnic spg file don't have specific "origin" field stating the oirigin of the data dataset.description = kwargs.get("description", default_description) + "\n" if len(spa_comments) > 1: dataset.description += "# Comments from Omnic:\n" for comment in spa_comments: dataset.description += comment + "\n---------------------\n" dataset.history = "Imported from spa file(s)" if "spa_history" in locals() and len("spa_history".strip(" ")) > 0: dataset.history = ( "Data processing history from Omnic :\n------------------------------------\n" + spa_history ) dataset._date = utcnow() dataset.meta.collection_length = info["collection_length"] / 100 * ur("s") dataset.meta.optical_velocity = info["optical_velocity"] dataset.meta.laser_frequency = info["reference_frequency"] * ur("cm^-1") if dataset.x.units is None and dataset.x.title == "data points": # interferogram dataset.meta.interferogram = True dataset.meta.td = list(dataset.shape) dataset.x._zpd = int(np.argmax(dataset)[-1]) dataset.x.set_laser_frequency() dataset.x._use_time_axis = ( False # True to have time, else it will be optical path difference ) return dataset @_importer_method def _read_srs(*args, **kwargs): dataset, filename = args frombytes = kwargs.get("frombytes", False) return_bg = kwargs.get("return_bg", False) # in this case, filename is actually a byte content fid = io.BytesIO(filename) if frombytes else open(filename, "rb") # noqa: SIM115 # read the file and determine whether it is a rapidscan or a high speed real time is_rapidscan, is_highspeed, is_tg = False, False, False """ At pos=304 (hex:130) is the position of the '02' key for series. Here we don't use it. Instead, we use one of the following sequence : RapidScan series: ---------------- the following sequence appears 3 times in the file b'\x02\x00\x00\x00\x18\x00\x00\x00\x00\x00\x48\x43\x00\x50\x43\x47 They are used to assert the srs file is rapid_scan and to locate headers and data: - The 1st one is located 152 bytes after the series header position - The 2nd one is located 152 bytes before the background header position and 56 bytes before either the background data / or the background title and infos followed by the background data - The 3rd one is located 60 bytes before the series data (spectre/ifg names and intensities High Speed Real time series: --------------------------- the following sequence appears 4 times in the file: b"\x02\x00\x00\x00\x18\x00\x00\x00\x00\x00\x48\x43\x00\xc8\xaf\x47" They are used to assert the srs file is high speed and to locate headers and data: - The 1st one is located 152 bytes after the series header position - The 2nd one is located 152 bytes before the background header position and 56 bytes before either the background data / or the background title and infos followed by the background data - The 3rd one is located 60 bytes before some data (don't know yet what it is) - The 4th one is located 60 bytes before the series data (spectra) TGA/IR or GC series: --------------------------- the following sequence appears 3 times in the file: b"\x02\x00\x00\x00\x18\x00\x00\x00\x00\x00", the next bytes can differ from one file to another. As it is common to other types of TG/IR they will be used to assert if the srs file is TGA/IR or GC *after the other formats*. They also allows locating headers and data: - The 1st one is located 152 bytes after the series header position - The 2nd one is located 152 bytes before the background header position and 56 bytes before either the background data / or the background title and infos followed by the background data - The 3rd one is located 60 bytes before the series data (spectre/ifg names and intensities ? """ sub_rs = b"\x02\x00\x00\x00\x18\x00\x00\x00\x00\x00\x48\x43\x00\x50\x43\x47" sub_hs = b"\x02\x00\x00\x00\x18\x00\x00\x00\x00\x00\x48\x43\x00\xc8\xaf\x47" sub_tg = b"\x02\x00\x00\x00\x18\x00\x00\x00\x00\x00" # find the first occurence and determine whether the srs is rapidscan or high # speed real time fid.seek(0) bytestring = fid.read() # try rapidscan first: pos = bytestring.find(sub_rs, 1) if pos > 0: is_rapidscan = True else: # not rapidscan, try high speed real time pos = bytestring.find(sub_hs, 1) if pos > 0: is_highspeed = True else: # neith rapid scan nor high speed real time, try TGA/IR pos = bytestring.find(sub_tg, 1) if pos > 0: is_tg = True else: raise NotImplementedError( "The reader is only implemented for Rapid Scan, " "High Speed Real Time, GC or TGA srs files. If you think " "your file belongs to one of these types, or if " "you'd like an update of the reader to read your " "file type, please report the issue on " "https://github.com/spectrochempy/spectrochempy" "/issues ", ) if is_rapidscan: # determine whether the srs is reprocessed. At pos=292 (hex:124) appears a # difference between pristine and reprocessed series fid.seek(292) key = fromfile(fid, dtype="uint8", count=16)[0] if key == 39: # (hex: 27) is_reprocessed = False elif key == 15: # (hex = 0F) is_reprocessed = True else: raise NotImplementedError( "The file is not recognized as a Rapid Scan " "srs file. Please report the issue on " "https://github.com/spectrochempy/spectrochempy" "/issues ", ) # find the 2 following starting indexes of sub_rs. # we will use the 1st (-> series info), the 2nd (-> background) and # the 3rd (-> data) fid.seek(0) bytestring = fid.read() index = [pos] while pos != -1: pos = bytestring.find(sub_rs, pos + 1) index.append(pos) index = np.array(index[:-1]) + [-152, -152, 60] if len(index) != 3: raise NotImplementedError( "The file is not recognized as a Rapid Scan " "srs file. Please report the issue on " "https://github.com/spectrochempy/spectrochempy" "/issues ", ) pos_info_data = index[0] pos_info_bg = index[1] pos_data = index[2] # read series data, except if the user asks for the background if not return_bg: info = _read_header(fid, pos_info_data) names, data = _read_srs_spectra(fid, pos_data, info["ny"], info["nx"]) # now get series history if not is_reprocessed: history = info["history"] else: # In reprocessed series the updated "DATA PROCESSING HISTORY" is located right after # the following 16 byte sequence: sub = ( b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff" ) pos = bytestring.find(sub) + 16 history = _readbtext(fid, pos, None) # read the background if the user asked for it. if return_bg: # First get background info info = _read_header(fid, pos_info_bg) if "background_name" not in info: # it is a short header fid.seek(index[1] + 208) data = fromfile(fid, dtype="float32", count=info["nx"]) else: # longer header, in such case the header indicates a spectrum # but the data are those of an ifg... For now need more examples return None # uncomment below to load the last datafield has the same dimension as the time axis # its function is not known. related to Grams-schmidt ? # pos = _nextline(pos) # found = False # while not found: # pos += 16 # f.seek(pos) # key = fromfile(f, dtype='uint8', count=1) # if key == 1: # pos += 4 # f.seek(pos) # X = fromfile(f, dtype='float32', count=info['ny']) # found = True # # X = NDDataset(X) # _x = Coord(np.around(np.linspace(0, info['ny']-1, info['ny']), 0), # title='time', # units='minutes') # X.set_coordset(x=_x) # X.name = '?' # X.title = '?' # X.description = 'unknown' # X.history = str(datetime.now(timezone.utc)) + ':imported from srs if is_highspeed: # find the 3 following starting indexes of sub. # 1st -> series info), # 2nd -> background ? # 3rd -> data ? # 4th -> ? fid.seek(0) bytestring = fid.read() index = [pos] while pos != -1: pos = bytestring.find(sub_hs, pos + 1) index.append(pos) index = np.array(index[:-1]) + [-152, -152, 0, 60] pos_info_data = index[0] pos_bg = index[1] pos_x = index[2] pos_data = index[3] if len(index) != 4: raise NotImplementedError( "The file is not recognized as a High Speed Real " "Time srs file. Please report the issue on " "https://github.com/spectrochempy/spectrochempy" "/issues ", ) if not return_bg: info = _read_header(fid, pos_info_data) # container for names and data names, data = _read_srs_spectra(fid, pos_data, info["ny"], info["nx"]) # Get series history. on the sample file, the history seems overwritten by # some post-processing, so info["history"] returns a corrupted string. # The "DATA PROCESSING HISTORY" (as indicated by omnic) is located right # after the following 16 byte sequence: sub = b"\x00\x00\x00\x00\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff" pos = bytestring.find(sub) + 16 history = _readbtext(fid, pos, None) # read the background if the user asked for it. elif return_bg: # First get background info info = _read_header(fid, pos_bg) if "background_name" not in info: # it is a short header fid.seek(index[1] + 208) data = fromfile(fid, dtype="float32", count=info["nx"]) else: # longer header, in such case the header indicates a spectrum # but the data are those of an ifg... For now need more examples return None if is_tg: fid.seek(0) bytestring = fid.read() index = [pos] while pos != -1: pos = bytestring.find(sub_tg, pos + 1) index.append(pos) index = np.array(index[:-1]) + [-152, -152, 60] if len(index) != 3: raise NotImplementedError( "The file is not recognized as a TG IR or GC " "srs file. Please report the issue on " "https://github.com/spectrochempy/spectrochempy" "/issues ", ) pos_info_data = index[0] pos_info_bg = index[1] pos_data = index[2] # read series data, except if the user asks for the background if not return_bg: info = _read_header(fid, pos_info_data) names, data = _read_srs_spectra(fid, pos_data, info["ny"], info["nx"]) # Note: info["history"] is empty in TG IR or GC series # the position of the history is indiated at pos 856 or 878 depending on the # file. # read the background if the user asked for it. if return_bg: # First get background info info = _read_header(fid, pos_info_bg) if "background_name" not in info: # it is a short header fid.seek(index[1] + 208) data = fromfile(fid, dtype="float32", count=info["nx"]) else: # longer header, in such case the header indicates a spectrum # but the data are those of an ifg... For now need more examples return None # Create NDDataset Object for the series if not return_bg: dataset = NDDataset(data) else: dataset = NDDataset(np.expand_dims(data, axis=0)) # in case part of the spectra/ifg has been blanked: dataset.mask = np.isnan(dataset.data) dataset.units = info["units"] dataset.title = info["title"] dataset.origin = "omnic" dataset.filename = filename # now add coordinates _x = Coord.linspace( info["firstx"], info["lastx"], int(info["nx"]), title=info["xtitle"], units=info["xunits"], ) # specific infos for series data if not return_bg: dataset.name = info["name"] _y = Coord( np.around(np.linspace(info["firsty"], info["lasty"], info["ny"]), 3), title="Time", units="minute", labels=names, ) else: _y = Coord() dataset.set_coordset(y=_y, x=_x) # Set origin, description and history dataset.origin = "omnic" dataset.description = kwargs.get("description", "Dataset from omnic srs file.") if "history" in locals(): dataset.history.append( "Omnic 'DATA PROCESSING HISTORY' :\n" "--------------------------------\n" + history, ) dataset.history.append(str(utcnow()) + ": imported from srs file " + str(filename)) dataset.meta.laser_frequency = info["reference_frequency"] * ur("cm^-1") dataset.meta.collection_length = info["collection_length"] * ur("s") if dataset.x.units is None and dataset.x.title == "data points": # interferogram dataset.meta.interferogram = True dataset.meta.td = list(dataset.shape) dataset.x._zpd = int(np.argmax(dataset)[-1]) # zero path difference dataset.x.set_laser_frequency() dataset.x._use_time_axis = ( False # True to have time, else it will be optical path difference ) fid.close() return dataset def _readbtext(fid, pos, size): # Read some text in binary file of given size. If size is None, the etxt is read # until b\0\ is encountered. # Returns utf-8 string fid.seek(pos) if size is None: btext = b"" while fid.read(1) != b"\x00": btext += fid.read(1) else: btext = fid.read(size) btext = re.sub(b"\x00+", b"\n", btext) if btext[:1] == b"\n": btext = btext[1:] if btext[-1:] == b"\n": btext = btext[:-1] try: text = btext.decode(encoding="utf-8") # decode btext to string except UnicodeDecodeError: try: text = btext.decode(encoding="latin_1") except UnicodeDecodeError: # pragma: no cover text = btext.decode(encoding="utf-8", errors="ignore") return text def _nextline(pos): # reset current position to the beginning of next line (16 bytes length) return 16 * (1 + pos // 16) def _read_header(fid, pos): r""" Read spectrum/ifg/series header. Parameters ---------- fid : BufferedReader The buffered binary stream. pos : int The position of the header (see Notes). Returns ------- dict, int Dictionary and current position in file Notes ----- So far, the header structure is as follows: - starts with b'\x01' , b'\x02', b'\x03' ... maybe indicating the header "type" - nx (UInt32): 4 bytes behind - xunits (UInt8): 8 bytes behind. So far, we have the following correspondence: * `x\01` : wavenumbers, cm-1 * `x\02` : datapoints (interferogram) * `x\03` : wavelength, nm * `x\04' : wavelength, um * `x\20' : Raman shift, cm-1 - data units (UInt8): 12 bytes behind. So far, we have the following correspondence: * `x\11` : absorbance * `x\10` : transmittance (%) * `x\0B` : reflectance (%) * `x\0C` : Kubelka_Munk * `x\16` : Volts (interferogram) * `x\1A` : photoacoustic * `x\1F` : Raman intensity - first x value (float32), 16 bytes behind - last x value (float32), 20 bytes behind - ... unknown - scan points (UInt32), 28 bytes behind - zpd (UInt32), 32 bytes behind - number of scans (UInt32), 36 bytes behind - ... unknown - number of background scans (UInt32), 52 bytes behind - ... unknown - collection length in 1/100th of sec (UIint32), 68 bytes behind - ... unknown - reference frequency (float32), 80 bytes behind - ... - optical velocity (float32), 188 bytes behind - ... - spectrum history (text), 208 bytes behind For "rapid-scan" srs files: - series name (text), 938 bytes behind - collection length (float32), 1002 bytes behind - last y (float 32), 1006 bytes behind - first y (float 32), 1010 bytes behind - ny (UInt32), 1026 - ... y unit could be at pos+1030 with 01 = minutes ? - history (text), 1200 bytes behind (only initial history. When reprocessed, updated history is at the end of the file after the b`\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff` sequence """ out = {} # determine the type of file fid.seek(0) bytes = fid.read(18) if bytes == b"Spectral Data File": filetype = "spa, spg" elif bytes == b"Spectral Exte File": filetype = "srs" # nx fid.seek(pos + 4) out["nx"] = fromfile(fid, "uint32", count=1) # xunits fid.seek(pos + 8) key = fromfile(fid, dtype="uint8", count=1) if key == 1: out["xunits"] = "cm^-1" out["xtitle"] = "wavenumbers" elif key == 2: out["xunits"] = None out["xtitle"] = "data points" elif key == 3: # pragma: no cover out["xunits"] = "nm" out["xtitle"] = "wavelengths" elif key == 4: # pragma: no cover out["xunits"] = "um" out["xtitle"] = "wavelengths" elif key == 32: # pragma: no cover out["xunits"] = "cm^-1" out["xtitle"] = "raman shift" else: # pragma: no cover out["xunits"] = None out["xtitle"] = "xaxis" info_("The nature of x data is not recognized, xtitle is set to 'xaxis'") # data units fid.seek(pos + 12) key = fromfile(fid, dtype="uint8", count=1) if key == 17: out["units"] = "absorbance" out["title"] = "absorbance" elif key == 16: # pragma: no cover out["units"] = "percent" out["title"] = "transmittance" elif key == 11: # pragma: no cover out["units"] = "percent" out["title"] = "reflectance" elif key == 12: # pragma: no cover out["units"] = None out["title"] = "log(1/R)" elif key == 20: # pragma: no cover out["units"] = "Kubelka_Munk" out["title"] = "Kubelka-Munk" elif key == 21: out["units"] = None out["title"] = "reflectance" elif key == 22: out["units"] = "V" out["title"] = "detector signal" elif key == 26: # pragma: no cover out["units"] = None out["title"] = "photoacoustic" elif key == 31: # pragma: no cover out["units"] = None out["title"] = "Raman intensity" else: # pragma: no cover out["units"] = None out["title"] = "intensity" info_("The nature of data is not recognized, title set to 'Intensity'") # firstx, lastx fid.seek(pos + 16) out["firstx"] = fromfile(fid, "float32", 1) fid.seek(pos + 20) out["lastx"] = fromfile(fid, "float32", 1) fid.seek(pos + 28) out["scan_pts"] = fromfile(fid, "uint32", 1) fid.seek(pos + 32) out["zpd"] = fromfile(fid, "uint32", 1) fid.seek(pos + 36) out["nscan"] = fromfile(fid, "uint32", 1) fid.seek(pos + 52) out["nbkgscan"] = fromfile(fid, "uint32", 1) fid.seek(pos + 68) out["collection_length"] = fromfile(fid, "uint32", 1) fid.seek(pos + 80) out["reference_frequency"] = fromfile(fid, "float32", 1) fid.seek(pos + 188) out["optical_velocity"] = fromfile(fid, "float32", 1) if filetype == "spa, spg": out["history"] = _readbtext(fid, pos + 208, None) if filetype == "srs": if out["nbkgscan"] == 0 and out["firstx"] > out["lastx"]: # an interferogram in rapid scan mode out["firstx"], out["lastx"] = out["lastx"], out["firstx"] out["name"] = _readbtext(fid, pos + 938, 256) fid.seek(pos + 1002) out["collection_length"] = fromfile(fid, "float32", 1) * 60 fid.seek(pos + 1006) out["lasty"] = fromfile(fid, "float32", 1) fid.seek(pos + 1010) out["firsty"] = fromfile(fid, "float32", 1) fid.seek(pos + 1026) out["ny"] = fromfile(fid, "uint32", 1) # y unit could be at pos+1030 with 01 = minutes ? out["history"] = _readbtext(fid, pos + 1200, None) if _readbtext(fid, pos + 208, 256)[:10] == "Background": # it is the header of a background out["background_name"] = _readbtext(fid, pos + 208, 256)[10:] return out def _read_srs_spectra(fid, pos_data, n_spectra, n_points): """ Read the spectra/interferogram names and data of a series. fid: BufferedReader pos_data: int n_spectra: int n_points: int returns: names (list), spectral data (ndarray) """ # container for names and data names = [] data = np.zeros((n_spectra, n_points)) # read the spectra/interferogram names and data # the first one.... pos = pos_data names.append(_readbtext(fid, pos, 256)) pos += 84 fid.seek(pos) data[0, :] = fromfile(fid, dtype="float32", count=n_points)[:] pos += n_points * 4 # ... and the remaining ones: for i in np.arange(n_spectra)[1:]: pos += 16 names.append(_readbtext(fid, pos, 256)) pos += 84 fid.seek(pos) data[i, :] = fromfile(fid, dtype="float32", count=n_points)[:] pos += n_points * 4 return names, data def _getintensities(fid, pos): # get intensities from the 03 (spectrum) # or 66 (sample ifg) or 67 (bg ifg) key, # returns a ndarray fid.seek(pos + 2) # skip 2 bytes intensity_pos = fromfile(fid, "uint32", 1) fid.seek(pos + 6) intensity_size = fromfile(fid, "uint32", 1) nintensities = int(intensity_size / 4) # Read and return spectral intensities fid.seek(intensity_pos) return fromfile(fid, "float32", int(nintensities))