Source code for spectrochempy.processing.fft.phasing

# ======================================================================================
# Copyright (©) 2015-2025 LCS - Laboratoire Catalyse et Spectrochimie, Caen, France.
# CeCILL-B FREE SOFTWARE LICENSE AGREEMENT
# See full LICENSE agreement in the root directory.
# ======================================================================================
"""NMR spectral processing functions which operate on the last dimension (1) of 2D arrays."""

__all__ = ["pk", "pk_exp"]
__dataset_methods__ = __all__

import functools

import numpy as np

from spectrochempy.application import error_
from spectrochempy.core.units import Quantity
from spectrochempy.core.units import ur

pi = np.pi


# ======================================================================================
# Decorators
# ======================================================================================
def _phase_method(method):
    @functools.wraps(method)
    def wrapper(dataset, **kwargs):
        # On which axis do we want to phase (get axis from arguments)
        axis, dim = dataset.get_axis(**kwargs, negative_axis=True)

        # output dataset inplace (by default) or not
        new = (
            dataset.copy() if not kwargs.pop("inplace", False) else dataset
        )  # copy to be sure not to modify this dataset

        swapped = False
        if axis != -1:
            new.swapdims(axis, -1, inplace=True)  # must be done in  place
            swapped = True

        # Get the coordinates for the last dimension
        x = new.coordset[dim]

        # check if the dimensionality is compatible with this kind of functions
        if x.unitless or x.dimensionless or x.units.dimensionality != "[time]":
            # extract initial phase from metadata
            def _check_units(par, default_units, inv=False):
                if not isinstance(par, Quantity):
                    par *= Quantity(1.0, default_units)
                elif inv:
                    if par == 0:
                        return par
                    par = 1.0 / (1.0 / par).to(default_units)
                else:
                    par = par.to(default_units)
                return par

            # Set correct units for the parameters
            dunits = dataset.coordset[dim].units

            current = [new.meta.phc0[-1], new.meta.phc1[-1]]
            rel = kwargs.pop("rel", False)
            if rel:  # relative phase
                current = [0, 0]
            kwargs["phc0"] = (
                _check_units(kwargs.get("phc0", 0), "degree") - current[0]
            ).magnitude
            kwargs["phc1"] = (
                _check_units(kwargs.get("phc1", 0), "degree") - current[1]
            ).magnitude
            kwargs["pivot"] = _check_units(
                kwargs.get("pivot", new.meta.pivot[-1]),
                dunits,
            ).magnitude
            kwargs["exptc"] = _check_units(
                kwargs.get("exptc", new.meta.get("exptc", [0] * new.ndim)[-1]),
                dunits,
                inv=True,
            ).magnitude

            if not new.meta.phased[-1]:
                # initial phase from topspin have not yet been used
                kwargs["phc0"] = -kwargs["phc0"]
                kwargs["phc1"] = -kwargs["phc1"]

            apod = method(new.data, **kwargs)
            new *= apod

            new.history = f"`{method.__name__}` applied to dimension `{dim}` with parameters: {kwargs}"

            if not new.meta.phased[-1]:
                new.meta.phased[-1] = True
                new.meta.phc0[-1] = 0 * ur.degree
                new.meta.phc1[-1] = 0 * ur.degree
                new.meta.exptc[-1] = 0 * (1 / dunits)
            else:
                if rel:
                    new.meta.phc0[-1] += kwargs["phc0"] * ur.degree
                    new.meta.phc1[-1] += kwargs["phc1"] * ur.degree
                else:
                    new.meta.phc0[-1] = kwargs["phc0"] * ur.degree
                    new.meta.phc1[-1] = kwargs["phc1"] * ur.degree

                    # TODO: to do for exptc too!
                new.meta.exptc[-1] = kwargs["exptc"] * (1 / dunits)

            new.meta.pivot[-1] = kwargs["pivot"] * dunits

        else:  # not (x.unitless or x.dimensionless or x.units.dimensionality != '[time]')
            error_(
                "This method apply only to dimensions with [frequency] or [dimensionless] dimensionality.\n"
                "Phase processing was thus cancelled",
            )

        # restore original data order if it was swapped
        if swapped:
            new.swapdims(axis, -1, inplace=True)  # must be done inplace

        return new

    return wrapper


# ======================================================================================
# Public methods
# ======================================================================================
[docs] @_phase_method def pk(dataset, phc0=0.0, phc1=0.0, exptc=0.0, pivot=0.0, **kwargs): """ Linear phase correction. For multidimensional NDDataset, the phase is by default applied on the last dimension. Parameters ---------- dataset : nddataset Input dataset. phc0 : float or `Quantity` , optional, default=0 degree Zero order phase in degrees. phc1 : float or `Quantity` , optional, default=0 degree First order phase in degrees. exptc : float or `Quantity` , optional, default=0 us Exponential decay constant. If not 0, phc1 is ignored. pivot: float or `Quantity` , optional, default=0 in units of the x coordinate Units if any must be compatible with last dimension units. Returns ------- phased Dataset. Other Parameters ---------------- dim : str or int, keyword parameter, optional, default='x'. Specify on which dimension to apply the phase. If `dim` is specified as an integer it is equivalent to the usual `axis` numpy parameter. inv : bool, keyword parameter, optional, default=False. True for inverse phasing. inplace : bool, keyword parameter, optional, default=False. True if we make the transform inplace. If False, the function return a new dataset. See Also -------- ps_exp : Exponential Phase Correction. pk : Automatic or manual phasing. """ phc0 = pi * phc0 / 180.0 size = dataset.shape[-1] if exptc > 0.0: apod = np.exp(1.0j * (phc0 * np.exp(-exptc * (np.arange(size) - pivot) / size))) else: phc1 = pi * phc1 / 180.0 apod = np.exp(1.0j * (phc0 + (phc1 * (np.arange(size) - pivot) / size))) return apod
[docs] def pk_exp(dataset, phc0=0.0, pivot=0.0, exptc=0.0, **kwargs): """ Exponential Phase Correction. For multidimensional NDDataset, the phase is by default applied on the last dimension. Parameters ---------- dataset : nddataset Input dataset. phc0 : float or `Quantity` , optional, default=0 degree Zero order phase in degrees. exptc : float or `Quantity` , optional, default=0 us Exponential decay constant. Returns ------- phased Dataset. Other Parameters ---------------- dim : str or int, keyword parameter, optional, default='x'. Specify on which dimension to apply the phase. If `dim` is specified as an integer it is equivalent to the usual `axis` numpy parameter. inv : bool, keyword parameter, optional, default=False. True for inverse phasing. inplace : bool, keyword parameter, optional, default=False. True if we make the transform inplace. If False, the function return a new dataset. See Also -------- ps : Linear Phase Correction. pk : Automatic or manual phasing. """ return pk(dataset, phc0=phc0, phc1=0, pivot=pivot, exptc=exptc)
# # TODO: work on pk (below a copy from MASAI) # @_phase_method # def _apk(source=None, options='', axis=-1): # """ # Automatic or manual phasing # # Parameters # ---------- # fit_phc1: bool, optional, default=False # also optimize first order phase if True # mode: String, optional, default='negmin' # method for automatic phase detection: 'negmin', 'entropy', ... # entropyd: int, optional, default=2 # order of derivation for method entropy # gamma: float, optional # relative weight for method entropy error with respect to negmin # axis: optional, default=-1 # # """ # # options evaluation # parser = argparse.ArgumentParser(description='PK processing.', usage=""" # pk [-h] [--auto] [--fit_phc1] [--ediff EDIFF] # [--gamma GAMMA] [--select {standard,max,cols}] # [--threshold THRESHOLD] [--mode MODE] # [--optmode {simplex,hopping}] # [--bound_phc0 BOUND_PHC0] [--bound_phc1 BOUND_PHC1] # [--verbose] # [phases [phases ...]] # """) # # global data, par, interact # # phc0 = None # phc1 = None # # # positional arguments # parser.add_argument('phases', default=(phc0, phc1), nargs='*', type=float, help='zero and first order phase') # parser.add_argument('--pivot', '-pv', default=None, type=float, help='pivot position in spectral units') # parser.add_argument('--interactive', '-i', default=None, nargs='*', # help='Interative mode on selected section here to check phase') # parser.add_argument('--pos', default=(0,), nargs='*', type=float, # help='row or column position where to check phase') # parser.add_argument('--shifted', default=0.0, type=float, help="position of the top in units of time") # parser.add_argument('--exp', '-ex', action='store_true', help='perform an exponential phase correction') # parser.add_argument('--auto', '-a', action='store_true', help='set to automatic phase mode') # parser.add_argument('--fit_phc1', '-u1', action='store_true', help='use phc1 in automatic phasing', ) # parser.add_argument('--ediff', '-ef', default=1, type=int, # help='order of the derivative for entropy calculation', ) # parser.add_argument('--gamma', '-ga', default=1.0, type=float, help='weight', ) # parser.add_argument('--select', '-st', default='standard', choices=['standard', 'max', 'cols', 'pos'], # help='selection mode in automatic phasing', ) # parser.add_argument('--threshold', '-th', default=50.0, type=int, # help='default threshold for columns selection', ) # parser.add_argument('--mode', '-m', default='negmin+entropy', help="position of the top in units of time") # parser.add_argument('--optmode', '-om', default='simplex', choices=['simplex', 'hopping'], # help="method of optimisation") # parser.add_argument('--bound_phc0', '-bp0', default=360., type=float, help="phc0 boundary") # parser.add_argument('--bound_phc1', '-bp1', default=10., type=float, help="phc1 boundary") # parser.add_argument('--byrow', '-br', action='store_true', help='to phase each row separately for series or 2D') # parser.add_argument('--verbose', action='store_true', help='verbose flag') # # parser.add_argument('--absolute', action='store_true', help='absolute flag: take the absolute value of phases') # # args = parser.parse_args(options.split()) # # if not source: # return # # if source: # # if axis == -1 or axis == 1: # par = source.par # else: # par = source.par2 # # if not par.isfreq: # print('This source is not already transformed: are you sure this is what you want?') # # # set data depending on the axis # # data = source.data # if axis == 0: # # transpose temporarily the data for indirect dimension ft # data = data.T # # # get initial phase and pivot # # get the initial phase setting # if DEBUG: # print('Current phases for axis %d : %f,%f' % (axis, par.PHC0, par.PHC1)) # # phc0, phc1 = args.phases # # if phc0 is None and phc1 is None: # # phase were not given, read stored phase # args.phases = phc0, phc1 = par.PHC0, par.PHC1 # # # relative phases # phc0, phc1 = phc0 - par.PHC0, phc1 - par.PHC1 # # # absolute phases # par.PHC0, par.PHC1 = args.phases # # # phases to apply # args.phases = phc0, phc1 # # # read pivot # ppivot = args.pivot # if ppivot is None: # ppivot = par.pivot # par.pivot = ppivot # # # we need to transform into index # pivot = position2index(data, ppivot) # args.pivot = pivot # # if DEBUG: # print('Phases demanded for axis %d : %f,%f with pivot: %f' % (axis, par.PHC0, par.PHC1, ppivot)) # print('Actual phases for axis %d to apply: %f,%f with pivot: %f' % (axis, phc0, phc1, ppivot)) # # sw = source.par.SW_h # p_shifted = args.shifted = 0.360 * args.shifted * sw # time are in ms # # # if the correction is exponential we need a second parameter tc=phc1 # if args.exp: # args.fit_phc1 = True # # # INTERACTIVE MODE --------------------- # if args.interactive is not None: # # if DEBUG: # print('INTERACTIVE PHASING MODE') # # interact = args.interactive # if interact == []: # interact = ['0'] # # # interactive phasing # # ps0, ps1 = par.PHC0, par.PHC1 # # def phasing(ph0, ph1, pivot): # # global interact # # # set data depending on the axis # data = source.data.copy() # if axis == 0: # # transpose temporarily the data for indirect dimension ft # data = data.T # # if axis == -1 or axis == 1: # par = source.par # else: # par = source.par2 # # p0 = (ph0 - ps0) * np.pi / 180. # convert to radians # p1 = (ph1 - ps1) * np.pi / 180. # # size = data.shape[-1] # pivot_ = position2index(data, pivot) # apod = np.exp(1.0j * (p0 + (p1 * (np.arange(size) - pivot_) / size))) # data = apod * data # fig = pl.figure(figsize=(4, 2)) # ax = fig.add_subplot(1, 1, 1) # ax.set_xlim((data.columns.max(), data.columns.min())) # # if interact[0] == 'max': # i, j = np.unravel_index(np.abs(data.values).argmax(), data.values.shape) # # dat = data.values[i] # dat = getsection(data, i, width=1., axis=0) # ax.plot(data.columns, dat.values) # else: # for item in interact: # pos = float(item) # i = position2index(data.T, pos) # index of a row # dat = data.values[i] # ax.plot(data.columns, dat) # # ax.axvline(pivot, color='r', lw='.1') # ax.axhline(0, color='g', lw='.1') # pl.show() # # print('ph0: {} ph1: {} pivot: {} (PHC0: {}'.format(ph0, ph1, pivot, # par.PHC0) + '\nWARNING THESE PHASE VALUES ARE NOT ' # STORED IN THE SOURCE!' + '\nSo you must ' # 'use them in ' # 'another `pk` ' # 'command)') # # w = interactive(phasing, # ph0=FloatSlider(min=ps0 - 45, max=ps0 + 45, step=0.001, value=ps0, continuous_update=False), # ph1=FloatSlider(min=ps1 - 180, max=ps1 + 180, step=0.01, value=ps1, continuous_update=False), # pivot=FloatSlider(min=data.columns.min(), max=data.columns.max(), step=0.001, value=ppivot, # continuous_update=False)) # # output = w.children[-1] # display(w) # # return w # # # Manual mode ------------------------------ # # elif not args.auto: # # if DEBUG: # print('MANUAL PHASE MODE') # # # not interactive and not automatic leaves here... # data = ps(data.values, phc0, phc1 + p_shifted, pivot=pivot, # is_exp=args.exp) # this return an array not a dataframe # source.history.append('Manual phasing phc0:%.2f, phc1:%.2f, pivot:%.2f ' % (par.PHC0, par.PHC1, par.pivot)) # # # Automatic mode ------------------------------ # else: # if DEBUG: # print('AUTOMATIC PHASE MODE') # # # APK... # if args.select == 'cols' and source.is_2d and axis == 0: # ar = picking(source, args.threshold, index=True) # args.cols = zip(*ar)[1] # # if args.byrow and source.is_2d: # rows = [] # for index in range(data.index.size): # row = data.iloc[index:index + 1].values # row, phc0, phc1 = autophase(row, args) # row = row - basecorr(row) # rows.append(row) # # merge all rows to recreate data # data = np.vstack(rows) # else: # data, phc0, phc1 = autophase(data.values, args, par) # return an array not a dataframe # # atxt = '(not optimized)' if not args.fit_phc1 else '' # # sbyrow = 'byrow' if args.byrow else '' # source.history.append( # 'Auto-phasing %s: phc0 = %.3f, phc1%s = %.3f, pivot:%.2f' % (sbyrow, phc0, atxt, phc1, pivot)) # # store the new phases # par.PHC0, par.PHC1 = phc0, phc1 # # try: # data = data - basecorr(data) # except Exception: # pass # # # un-transpose the data if needed # if axis == 0: # data = data.T # # source.data = data # # # # # # autophase # # # def _checkin(ph, bp): # if ph < -bp: # ph = -bp # if ph > bp: # ph = bp # return ph # # # def _autophase(data, args, par): # """ # Automatic phasing of 1D or 2D spectra # """ # # dat = data.copy() # # # get parameters # # -------------- # phc0, phc1 = args.phases # pivot = args.pivot # is_exp = args.exp # verbose = args.verbose # # if DEBUG: # print('performing autophase') # # select = args.select # # if select == 'standard' or data.shape[0] == 1: # # select the first row # dat = dat[0] # select = 'standard' # # if select == 'max' and data.ndim > 1: # i, j = np.unravel_index(data.real.argmax(), data.shape) # dat = data[i] # # if select == 'cols' and data.ndim > 1: # l = [] # cols = args.cols # if verbose: # print('columns selected:', cols) # for col in cols: # i = int(col) # l.append(data[i]) # dat = np.vstack(l) # # # apkmode # # -------- # if DEBUG: # print('\nselect: ', select) # # mode = args.mode # ediff = args.ediff # gamma = args.gamma # optmode = args.optmode # p_shifted = args.shifted # # prepare the phase to optimized with bound # fp = FitParameters() # bp0 = args.bound_phc0 # bp1 = args.bound_phc1 # phc0 = _checkin(phc0, bp0) # # fixed is false # fp['phc0'] = phc0, -bp0, bp0, False # # fit_phc1 = args.fit_phc1 # phc1 = _checkin(phc1, bp1) # fixed = not fit_phc1 # fp['phc1'] = phc1, -bp1, bp1, fixed # # # optim # global ni, nas, spe, err, niter # ni = 0 # err = 0 # niter = 0 # # # internal error function ---------------------------------------------------------------------------------- # def _phase_error(p, s): # # global nas, spe, ni, err # # p0 = p['phc0'] # p1 = p['phc1'] # sc = s.copy() # sc = ps(sc, p0, p1 + p_shifted, pivot=pivot, is_exp=is_exp) # # # baseline correction # scp = sc - basecorr(sc) # # # Negative area minimization # nas = 0. # nam = 0. # if "negmin" in mode: # fm = scp[scp.real <= 0].real # fm = fm - np.min(fm) # nas = np.sum(fm ** 2) # # # normalisation # fp = scp.real # fp = fp - np.min(fm) # nas = nas * 100. / (np.sum(fp ** 2) + 1.e-30) # # # entropy minimization # spe = 0. # if 'entropy' in mode: # h = np.diff(scp.real, ediff) # h = np.abs(h) # h = abs(h / np.sum(np.abs(h))) + 1.e-30 # spe = -np.sum(h * np.log(h)) # ni += 1 # err = nas + spe * gamma # # return err # # # end _phase_error function ------------------------------------------------------------------------------------- # # # callback function-------------------------------------------------------- # def callback(x, f=None, accepted=None): # """ # callback print function # """ # global niter, err # return # # if f is not None: # hopping # pass # if f is None: # simplex # # display.clear_output(wait=True) # # msg = ("Iteration: %d (chi2: %.5f)" % (niter, err) + # ' Negative area (NA):%.3g Entropy (S) * gamma:%.3g' % ( # nas, spe * gamma)) # auto_close_message(msg, title='Information', time=.001) # # niter += 1 # # # end callback function --------------------------------------------------- # # # convergence is not insured depending on the starting values # fp, err1 = optimize(_phase_error, fp, args=[dat, ], method=optmode, callback=None) # nas_save = nas # spe_save = spe # fp_save = fp.copy() # # if optmode.upper() != 'HOPPING': # fp['phc0'] = (fp['phc0'] - 180.0) % 360.0, -bp0, bp0, False # ni = 0 # fp, err2 = optimize(_phase_error, fp, args=(dat,), method=optmode, callback=callback) # # # select the best # if err2 > err1: # fp = fp_save.copy() # nas = nas_save # spe = spe_save # # # extract results # phc0 = fp['phc0'] # phc1 = fp['phc1'] # # if verbose: # print('Phase:') # # if fit_phc1: # if verbose: # print('phc0: %.3f' % phc0) # print('phc1: %.3f' % phc1) # else: # if verbose: # print('phc0: %.3f' % phc0) # print('phc1: %.3f (not optimized)' % phc1) # # if verbose: # print('Negative area (NA):%.3g Entropy (S) * gamma:%.3g' % (nas, spe * gamma)) # # # apply to the original data and return # data = pk(data, phc0, phc1 + p_shifted, pivot=pivot, is_exp=is_exp) # return data, phc0, phc1