# ======================================================================================
# Copyright (©) 2015-2025 LCS - Laboratoire Catalyse et Spectrochimie, Caen, France.
# CeCILL-B FREE SOFTWARE LICENSE AGREEMENT
# See full LICENSE agreement in the root directory.
# ======================================================================================
__all__ = ["fft", "ifft", "mc", "ps", "ht"]
__dataset_methods__ = __all__
import re
import numpy as np
from quaternion import as_float_array
from scipy.signal import hilbert
from spectrochempy.application import error_
from spectrochempy.core.dataset.coord import Coord
from spectrochempy.core.units import ur
from spectrochempy.processing.fft.zero_filling import zf_size
from spectrochempy.utils.decorators import _units_agnostic_method
from spectrochempy.utils.misc import as_quaternion
from spectrochempy.utils.misc import get_component
from spectrochempy.utils.misc import largest_power_of_2
from spectrochempy.utils.misc import typequaternion
# ======================================================================================
# Private methods
# ======================================================================================
def _fft(data):
if data.dtype == typequaternion:
dr = get_component(data, "R")
fr = np.fft.fftshift(np.fft.fft(dr), -1)
di = get_component(data, "I")
fi = np.fft.fftshift(np.fft.fft(di), -1)
# rebuild the quaternion
data = as_quaternion(fr, fi)
else:
data = np.fft.fftshift(np.fft.fft(data), -1)
return data
def _ifft(data):
if data.dtype == typequaternion:
fr = get_component(data, "R")
dr = np.fft.ifft(np.fft.ifftshift(fr, -1))
fi = get_component(data, "I")
di = np.fft.ifft(np.fft.ifftshift(fi, -1))
# rebuild the quaternion
data = as_quaternion(dr, di)
else:
data = np.fft.ifft(np.fft.ifftshift(data, -1))
return data
def _fft_positive(data):
if data.dtype == typequaternion:
dr = get_component(data, "R")
fr = np.fft.fftshift(np.fft.ifft(dr).astype(data.dtype)) * data.shape[-1]
di = get_component(data, "I")
fi = np.fft.fftshift(np.fft.ifft(di).astype(data.dtype)) * data.shape[-1]
# rebuild the quaternion
data = as_quaternion(fr, fi)
else:
data = np.fft.fftshift(np.fft.ifft(data).astype(data.dtype)) * data.shape[-1]
return data
def _ifft_positive(data):
if data.dtype == typequaternion:
fr = get_component(data, "R")
dr = np.fft.fft(np.fft.ifftshift(fr, -1)) * data.shape[-1]
fi = get_component(data, "I")
di = np.fft.fft(np.fft.ifftshift(fi, -1)) * data.shape[-1]
# rebuild the quaternion
data = as_quaternion(dr, di)
else:
data = np.fft.fft(np.fft.ifftshift(data, -1)) * data.shape[-1]
return data
def _states_fft(data, tppi=False):
# FFT transform according to STATES encoding
# warning: at this point, data must have been swapped so the last dimension is the one used for FFT
wt, yt, xt, zt = as_float_array(
data,
).T # x and y are exchanged due to swapping of dims
w, y, x, z = wt.T, yt.T, xt.T, zt.T
sr = (w - 1j * y) / 2.0
si = (x - 1j * z) / 2.0
if tppi:
sr[..., 1::2] = -sr[..., 1::2]
si[..., 1::2] = -si[..., 1::2]
fr = np.fft.fftshift(np.fft.fft(sr), -1)
fi = np.fft.fftshift(np.fft.fft(si), -1)
# rebuild the quaternion
return as_quaternion(fr, fi)
def _echoanti_fft(data):
# FFT transform according to ECHO-ANTIECHO encoding
# warning: at this point, data must have been swapped so the last dimension is the one used for FFT
wt, yt, xt, zt = as_float_array(
data,
).T # x and y are exchanged due to swapping of dims
w, y, x, z = wt.T, xt.T, yt.T, zt.T
c = (w + y) + 1j * (w - y)
s = (x + z) - 1j * (x - z)
fc = np.fft.fftshift(np.fft.fft(c / 2.0), -1)
fs = np.fft.fftshift(np.fft.fft(s / 2.0), -1)
return as_quaternion(fc, fs)
def _tppi_fft(data):
# FFT transform according to TPPI encoding
# warning: at this point, data must have been swapped so the last dimension is the one used for FFT
wt, yt, xt, zt = as_float_array(
data,
).T # x and y are exchanged due to swapping of dims
w, y, x, z = wt.T, xt.T, yt.T, zt.T
sx = w + 1j * y
sy = x + 1j * z
sx[..., 1::2] = -sx[..., 1::2]
sy[..., 1::2] = -sy[..., 1::2]
fx = np.fft.fftshift(np.fft.fft(sx), -1) # reverse
fy = np.fft.fftshift(np.fft.fft(sy), -1)
# rebuild the quaternion
return as_quaternion(fx, fy)
def _qf_fft(data):
# FFT transform according to QF encoding
return np.fft.fftshift(np.fft.fft(np.conjugate(data)), -1)
def _interferogram_fft(data):
"""FFT transform for rapid-scan interferograms. Phase corrected using the Mertz method."""
def _get_zpd(data, mode="max"):
if mode == "max":
return np.argmax(data, -1)
if mode == "abs":
return int(np.argmax(np.abs(data), -1))
return None
zpd = _get_zpd(data, mode="abs")
size = data.shape[-1]
# Compute Mertz phase correction
w = np.arange(0, zpd) / zpd
ma = np.concatenate((w, w[::-1]))
dma = np.zeros_like(data)
dma[..., 0 : 2 * zpd] = data[..., 0 : 2 * zpd] * ma[0 : 2 * zpd]
dma = np.roll(dma, -zpd)
dma[0] = dma[0] / 2.0
dma[-1] = dma[-1] / 2.0
dma = np.fft.rfft(dma)[..., 0 : size // 2]
phase = np.arctan(dma.imag / dma.real)
# Make final phase corrected spectrum
w = np.arange(0, 2 * zpd) / (2 * zpd)
mapod = np.ones_like(data)
mapod[..., 0 : 2 * zpd] = w
data = np.roll(data * mapod, int(-zpd))
data = np.fft.rfft(data)[..., 0 : size // 2] * np.exp(-1j * phase)
# The imaginary part can be now discarder
return data.real[..., ::-1] / 2.0
# ======================================================================================
# Public methods
# ======================================================================================
[docs]
def ifft(dataset, size=None, **kwargs):
"""
Apply a inverse fast fourier transform.
For multidimensional NDDataset,
the apodization is by default performed on the last dimension.
The data in the last dimension MUST be in frequency (or without dimension)
or an error is raised.
To make direct Fourier transform, i.e., from frequency to time domain, use the `fft` transform.
Parameters
----------
dataset : `NDDataset`
The dataset on which to apply the fft transformation.
size : int, optional
Size of the transformed dataset dimension - a shorter parameter is `si` . by default, the size is the closest
power of two greater than the data size.
**kwargs
Optional keyword parameters (see Other Parameters).
Returns
-------
out
Transformed `NDDataset` .
Other Parameters
----------------
dim : str or int, optional, default='x'.
Specify on which dimension to apply this method. If `dim` is specified as an integer it is equivalent
to the usual `axis` numpy parameter.
inplace : bool, optional, default=False.
True if we make the transform inplace. If False, the function return a new object
See Also
--------
fft : Direct Fourier transform.
"""
return fft(dataset, size=size, inv=True, **kwargs)
[docs]
def fft(dataset, size=None, sizeff=None, inv=False, ppm=True, **kwargs):
"""
Apply a complex fast fourier transform.
For multidimensional NDDataset,
the apodization is by default performed on the last dimension.
The data in the last dimension MUST be in time-domain (or without dimension)
or an error is raised.
To make reverse Fourier transform, i.e., from frequency to time domain, use the `ifft` transform
(or equivalently, the `inv=True` parameters.
Parameters
----------
dataset : `NDDataset`
The dataset on which to apply the fft transformation.
size : int, optional
Size of the transformed dataset dimension - a shorter parameter is `si` . by default, the size is the closest
power of two greater than the data size.
sizeff : int, optional
The number of effective data point to take into account for the transformation. By default it is equal to the
data size, but may be smaller.
inv : bool, optional, default=False
If True, an inverse Fourier transform is performed - size parameter is not taken into account.
ppm : bool, optional, default=True
If True, and data are from NMR, then a ppm scale is calculated instead of frequency.
**kwargs
Optional keyword parameters (see Other Parameters).
Returns
-------
out
Transformed `NDDataset` .
Other Parameters
----------------
dim : str or int, optional, default='x'.
Specify on which dimension to apply this method. If `dim` is specified as an integer it is equivalent
to the usual `axis` numpy parameter.
inplace : bool, optional, default=False.
True if we make the transform inplace. If False, the function return a new object
tdeff : int, optional
Alias of sizeff (specific to NMR). If both sizeff and tdeff are passed, sizeff has the priority.
See Also
--------
ifft : Inverse Fourier transform.
"""
# datatype
is_nmr = dataset.origin.lower() in [
"topspin",
]
is_ir = dataset.meta.interferogram
# On which axis do we want to apply transform (get axis from arguments)
dim = kwargs.pop("dim", kwargs.pop("axis", -1))
axis, dim = dataset.get_axis(dim, negative_axis=True)
# output dataset inplace or not
inplace = kwargs.pop("inplace", False)
new = dataset.copy() if not inplace else dataset
# The last dimension is always the dimension on which we apply the fourier transform.
# If needed, we swap the dimensions to be sure to be in this situation
swapped = False
if axis != -1:
new.swapdims(axis, -1, inplace=True) # must be done in place
swapped = True
# Select the last coordinates
x = new.coordset[dim]
# Performs some dimentionality checking
error = False
if (
not inv
and not x.unitless
and not x.dimensionless
and x.units.dimensionality != "[time]"
):
error_(
Exception,
"fft apply only to dimensions with [time] dimensionality or dimensionless coords\n"
"fft processing was thus cancelled",
)
error = True
elif (
inv
and not x.unitless
and x.units.dimensionality != "1/[time]"
and not x.dimensionless
):
error_(
Exception,
"ifft apply only to dimensions with [frequency] dimensionality or with ppm units "
"or dimensionless coords.\n ifft processing was thus cancelled",
)
error = True
# Should not be masked
elif new.is_masked:
error_(
Exception,
"current fft or ifft processing does not support masked data as input.\n processing was thus cancelled",
)
error = True
# Coordinates should be uniformly spaced (linear coordinate)
if not x.linear:
error_(
"fft or ifft processing only support linear coordinates.\n"
"Processing was thus cancelled",
)
error = True
if hasattr(x, "_use_time_axis"):
x._use_time_axis = True # we need to have dimentionless or time units
if not error:
# OK we can proceed
# time domain size
td = None
if not inv:
td = x.size
# if no size (or si) parameter then use the size of the data
# (size not used for inverse transform
if size is None or inv:
size = kwargs.get("si", x.size)
# we default to the closest power of two larger of the data size
if is_nmr:
size = largest_power_of_2(size)
# do we have an effective td to apply
tdeff = sizeff
if tdeff is None:
tdeff = kwargs.get("tdeff", td)
if tdeff is None or tdeff < 5 or tdeff > size:
tdeff = size
# Eventually apply the effective size
new[..., tdeff:] = 0.0
# Should we work on complex or hypercomplex data
# interleaved is in case of >2D data ( # TODO: >D not yet implemented in ndcomplex.py
iscomplex = False
if axis == -1:
iscomplex = new.is_complex
if new.is_quaternion or new.is_interleaved:
iscomplex = True
# If we are in NMR we have an additional complication due to the mode
# of acquisition (sequential mode when ['QSEQ','TPPI','STATES-TPPI'])
encoding = "undefined"
if not inv and "encoding" in new.meta:
encoding = new.meta.encoding[-1]
qsim = encoding in ["QSIM", "DQD"]
qseq = "QSEQ" in encoding
states = "STATES" in encoding
echoanti = "ECHO-ANTIECHO" in encoding
tppi = "TPPI" in encoding
qf = "QF" in encoding
zf_size(new, size=size, inplace=True)
# Perform the fft
if qsim: # F2 fourier transform
data = _fft(new.data)
elif qseq:
raise NotImplementedError("QSEQ not yet implemented")
elif states:
data = _states_fft(new.data, tppi)
elif tppi:
data = _tppi_fft(new.data)
elif echoanti:
data = _echoanti_fft(new.data)
elif qf:
# we must perform a real fourier transform of a time domain dataset
data = _qf_fft(new.data)
elif iscomplex and inv:
# We assume no special encoding for inverse complex fft transform
data = _ifft(new.data)
elif not iscomplex and not inv and is_ir:
# transform interferogram
data = _interferogram_fft(new.data)
elif not iscomplex and inv:
raise NotImplementedError("Inverse FFT for real dimension")
else:
raise NotImplementedError(
f"{encoding} not yet implemented. We recommend you to put an issue on "
f"Github, so we will not forget to work on this!.",
)
# We need here to create a new dataset with new shape and axis
new._data = data
new.mask = False
# create new coordinates for the transformed data
if is_nmr:
sfo1 = new.meta.sfo1[-1]
bf1 = new.meta.bf1[-1]
sf = new.meta.sf[-1]
sw = new.meta.sw_h[-1]
if new.meta.nuc1 is not None:
nuc1 = new.meta.nuc1[-1]
regex = r"([^a-zA-Z]+)([a-zA-Z]+)"
m = re.match(regex, nuc1)
if m is not None:
mass = m[1]
name = m[2]
nucleus = "^{" + mass + "}" + name
else:
nucleus = ""
else:
nucleus = ""
else:
sfo1 = 0 * ur.Hz
bf1 = sfo1
dw = x.spacing
if isinstance(dw, list):
pass # print()
sw = 1 / 2 / dw
sf = -sw / 2
size = size // 2
if not inv:
# time to frequency
sizem = max(size - 1, 1)
deltaf = -sw / sizem
first = sfo1 - sf - deltaf * sizem / 2.0
# newcoord = type(x)(np.arange(size) * deltaf + first)
newcoord = Coord.arange(size) * deltaf + first
newcoord.show_datapoints = False
newcoord.name = x.name
new.title = "intensity"
if is_nmr:
newcoord.title = f"${nucleus}$ frequency"
newcoord.ito("Hz")
elif is_ir:
new._units = None
newcoord.title = "wavenumbers"
newcoord.ito("cm^-1")
else:
newcoord.title = "frequency"
newcoord.ito("Hz")
else:
# frequency or ppm to time
sw = abs(x.data[-1] - x.data[0])
if x.units == "ppm":
sw = bf1.to("Hz") * sw / 1.0e6
deltat = (1.0 / sw).to("us")
newcoord = Coord.arange(size) * deltat
newcoord.name = x.name
newcoord.title = "time"
newcoord.ito("us")
if is_nmr and not inv:
newcoord.larmor = bf1 # needed for ppm transformation
ppm = kwargs.get("ppm", True)
if ppm:
newcoord.ito("ppm")
newcoord.title = rf"$\delta\ {nucleus}$"
new.coordset[dim] = newcoord
# update history
s = "ifft" if inv else "fft"
new.history = f"{s} applied on dimension {dim}"
# PHASE ?
iscomplex = new.is_complex or new.is_quaternion
if iscomplex and not inv:
# phase frequency domain
# if some phase related metadata do not exist yet, initialize them
new.meta.readonly = False
if not new.meta.phased:
new.meta.phased = [False] * new.ndim
if not new.meta.phc0:
new.meta.phc0 = [0] * new.ndim
if not new.meta.phc1:
new.meta.phc1 = [0] * new.ndim
if not new.meta.exptc:
new.meta.exptc = [0] * new.ndim
if not new.meta.pivot:
new.meta.pivot = [0] * new.ndim
# applied the stored phases
new.pk(inplace=True)
new.meta.pivot[-1] = abs(new).coordmax(dim=dim)
new.meta.readonly = True
# restore original data order if it was swapped
if swapped:
new.swapdims(axis, -1, inplace=True) # must be done inplace
return new
ft = fft
ift = ifft
# Modulus Calculation
[docs]
@_units_agnostic_method
def mc(dataset):
"""
Modulus calculation.
Calculates sqrt(real^2 + imag^2)
"""
return np.sqrt(dataset.real**2 + dataset.imag**2)
[docs]
@_units_agnostic_method
def ps(dataset):
"""
Power spectrum. Squared version.
Calculated real^2+imag^2
"""
return dataset.real**2 + dataset.imag**2
[docs]
@_units_agnostic_method
def ht(dataset, N=None):
"""
Hilbert transform.
Reconstruct imaginary data via hilbert transform.
Copied from NMRGlue (BSD3 licence).
Parameters
----------
data : ndarrat
Array of NMR data.
N : int or None
Number of Fourier components.
Returns
-------
ndata : ndarray
NMR data which has been Hilvert transformed.
"""
# create an empty output array
fac = N / dataset.shape[-1]
z = np.empty(dataset.shape, dtype=(dataset.flat[0] + dataset.flat[1] * 1.0j).dtype)
if dataset.ndim == 1:
z[:] = hilbert(dataset.real, N)[: dataset.shape[-1]] * fac
else:
for i, vec in enumerate(dataset):
z[i] = hilbert(vec.real, N)[: dataset.shape[-1]] * fac
# correct the real data as sometimes it changes
z.real = dataset.real
return z