Source code for pyActigraphy.analysis.fractal

# # (Multi-fractal)Detrended fluctuation analysis
from joblib import Parallel, delayed
from numba import njit  # , prange
import pandas as pd
import numpy as np
from numpy.polynomial import polynomial
from scipy.stats import linregress


@njit
def _profile(X):

    trend = np.nanmean(X)

    prof = np.nancumsum(X - trend)

    return prof


@njit
def _rolling_window(x, n):
    r'''Split input array in an array of window-sized arrays, shifted by one
    element. Emulate rolling function of pandas.Series.

    Parameters
    ----------
    x : (N,) array_like
        Input
    n : int
        Size of the rolling window
    Returns
    -------
    roll : (N,n) array_like
        Array containing the successive windows.
    '''

    shape = x.shape[:-1] + (x.shape[-1] - n + 1, n)
    strides = x.strides + (x.strides[-1],)
    return np.lib.stride_tricks.as_strided(x, shape=shape, strides=strides)


@njit  # (float64[:,:](float64[:],int64,boolean))
def _segmentation(x, n, backward=False, overlap=False):
    r'''Split input array in an array of window-sized arrays, shifted by one
    element. Emulate rolling function of pandas.Series.

    Parameters
    ----------
    x : (N,) array_like
        Input.
    n : int
        Window size
    backward: bool, optional.
        If set to True, runs the segmentation from the end of the input array.
        Default is False.
    overlap: bool, optional
        If set to True, consecutive windows overlap by 50%.
        Default is False.
    Returns
    -------
    seg : (N,n) array_like
        Array containing the successive windows.
    '''

    # Compute consecutive windows, shifted by one element
    windows = _rolling_window(x, n)

    # Compute distance between the center of two consecutive windows
    stride = n//2 if overlap else n

    # Non-overlapping segments
    if backward:
        segments = windows[::stride]
    else:
        segments = windows[::-stride]

    return segments


[docs]class Fractal(): r''' Class for Fractality Analysis This class implements methods used to perform a (multifractal) detrended fluctuation analysis, (MF)DFA. The implementation follows the original description made in [1]_ and [2]_. The (MF)DFA consists in: 1. removing the global mean and integrating the time series of a signal: .. math:: X_{t} = \sum_i^N(x_i - \bar{x}) where :math:`\bar{x}` denotes the mean value of the time series :math:`\{x_i\}_{i\in[1:N]}`; 2. dividing the integrated signal into N non-overlapping windows of the same chosen size n; 3. detrending the integrated signal in each window using polynomial functions to obtain residuals, that is: .. math:: \widehat{X_t} = X_{t} - Y_{t} where :math:`Y_t` denotes the trend obtained by polynomial fit and :math:`\widehat{X_t}` the integrated time series after detrending; 4. calculating the root mean square of residuals in all windows as detrended fluctuation amplitude :math:`F_q(n)`, that is: .. math:: F_q(n) = \left[\frac{1}{N} \sum_{t=1}^N \widehat{X_t}^q\right]^{1/q} For :math:`q=2`, the DFA is retrieved. In the context of actigraphy, further informations can be found in: * Hu, K., Ivanov, P. C., Chen, Z., Hilton, M. F., Stanley, H. E., & Shea, S. A. (2004). Non-random fluctuations and multi-scale dynamics regulation of human activity. Physica A: Statistical Mechanics and Its Applications, 337(1–2), 307–318. https://doi.org/10.1016/j.physa.2004.01.042 References ---------- .. [1] Peng, C.-K., Buldyrev, S. V., Havlin, S., Simons, M., Stanley, H. E., & Goldberger, A. L. (1994). Mosaic organization of DNA nucleotides. Physical Review E, 49(2), 1685–1689. https://doi.org/10.1103/PhysRevE.49.1685 .. [2] Kantelhardt, J. W., Zschiegner, S. A., Koscielny-Bunde, E., Havlin, S., Bunde, A., & Stanley, H. E. (2002). Multifractal detrended fluctuation analysis of nonstationary time series. Physica A: Statistical Mechanics and Its Applications, 316(1–4), 87–114. https://doi.org/10.1016/S0378-4371(02)01383-3 ''' def __init__(self, n_array=None, q_array=None): self.__n_array = n_array self.__q_array = q_array
[docs] @classmethod def profile(cls, X): r'''Profile function Detrend and integrate the signal. Parameters ---------- x: numpy.array Input array. n: int Window size. Returns ------- segments: numpy.array Non-overlappping windows of size n. ''' return _profile(X)
[docs] @classmethod def segmentation(cls, x, n, backward=False, overlap=False): r'''Segmentation function Segment the signal into non-overlapping windows of equal size. Parameters ---------- x: numpy.array Input array. n: int Window size. backward: bool, optional If set to True, start segmentation from the end of the signal. Default is False. overlap: bool, optional If set to True, consecutive windows overlap by 50%. Default is False. Returns ------- segments: numpy.array Non-overlappping windows of size n. ''' return _segmentation(x, n, backward, overlap)
[docs] @classmethod def local_msq_residuals(cls, segment, deg): r'''Mean squared residuals Mean squared residuals of the least squares polynomial fit. Parameters ---------- segment: numpy.array Input array. deg: int Degree(s) of the fitting polynomials. Returns ------- residuals_msq: numpy.float Mean squared residuals. ''' # Segment length n = len(segment) # X-axis x = np.linspace(1, n, n) # Fit the data _, fit_result = polynomial.polyfit(y=segment, x=x, deg=deg, full=True) # Return mean squared residuals return fit_result[0][0]/n if fit_result[0].size != 0 else np.nan
[docs] @classmethod def fluctuations(cls, X, n, deg, overlap=False): r'''Fluctuation function The fluctuations are defined as the mean squared residuals of the least squares fit in each non-overlapping window. Parameters ---------- X: numpy.array Array of activity counts. n: int Window size. deg: int Degree(s) of the fitting polynomials. overlap: bool, optional If set to True, consecutive windows during segmentation overlap by 50%. Default is False. Returns ------- F: numpy.array Array of mean squared residuals in each segment. ''' # Detrend and integrate time series Y = cls.profile(X) # Define non-overlapping segments segments_fwd = cls.segmentation(Y, n, backward=False, overlap=overlap) segments_bwd = cls.segmentation(Y, n, backward=True, overlap=overlap) # Assert equal numbers of segments assert(segments_fwd.shape == segments_bwd.shape) F = np.empty(len(segments_fwd)+len(segments_bwd)) # Compute the sum of the squared residuals for each segment for i in range(len(segments_fwd)): F[i*2] = cls.local_msq_residuals(segments_fwd[i], deg) F[i*2+1] = cls.local_msq_residuals(segments_bwd[i], deg) return F
@classmethod def q_th_order_mean_square(cls, F, q): r'''Qth-order mean square function Compute the q-th order mean squares. Parameters ---------- F: numpy.array Array of fluctuations. q: scalar Order. Returns ------- qth_msq: numpy.float Q-th order mean square. ''' if q == 0: qth_msq = np.exp(0.5*np.nanmean(np.log(F))) else: qth_msq = np.power(np.nanmean(np.power(F, q/2)), 1/q) return qth_msq
[docs] @classmethod def dfa(cls, ts, n_array, deg=1, overlap=False, log=False): r'''Detrended Fluctuation Analysis function Compute the q-th order mean squared fluctuations for different segment lengths. Parameters ---------- ts: pandas.Series Input signal. n_array: array of int Time scales (i.e window sizes). In minutes. deg: int, optional Degree(s) of the fitting polynomials. Default is 1. overlap: bool, optional If set to True, consecutive windows during segmentation overlap by 50%. Default is False. log: bool, optional If set to True, returned values are log-transformed. Default is False. Returns ------- q_th_order_msq_fluc: numpy.array Array of q-th order mean squared fluctuations. ''' # Check sampling frequency freq = ts.index.freq if freq is None: raise ValueError( "Cannot check the sampling frequency of the input data.\n" "Might be the case for unevenly spaced data or masked data.\n" "Please, consider using pandas.Series.resample." ) else: factor = pd.Timedelta('1min')/freq iterable = ( cls.q_th_order_mean_square( cls.fluctuations( ts.values, n=int(n), deg=deg, overlap=overlap ), q=2 ) for n in factor*n_array ) q_th_order_msq_fluc = np.fromiter( iterable, dtype='float', count=len(n_array) ) if log: q_th_order_msq_fluc = np.log(q_th_order_msq_fluc) return q_th_order_msq_fluc
[docs] @classmethod def dfa_parallel( cls, ts, n_array, deg=1, overlap=False, log=False, n_jobs=2, prefer=None, verbose=0 ): r'''Detrended Fluctuation Analysis function Compute, in parallel, the q-th order mean squared fluctuations for different segment lengths. Parameters ---------- ts : pandas.Series Input signal. n_array: array of int Time scales (i.e window sizes). In minutes. deg: int, optional Degree(s) of the fitting polynomials. Default is 1. overlap: bool, optional If set to True, consecutive windows during segmentation overlap by 50%. Default is False. log: bool, optional If set to True, returned values are log-transformed. Default is False. n_jobs: int, optional Number of CPU to use for parallel fitting. Default is 2. prefer: str, optional Soft hint to choose the default backend. Supported option:'processes', 'threads'. See joblib package documentation for more info. Default is None. verbose: int, optional Display a progress meter if set to a value > 0. Default is 0. Returns ------- q_th_order_msq_fluc: numpy.array Array of q-th order mean squared fluctuations. ''' # Check sampling frequency freq = ts.index.freq if freq is None: raise ValueError( "Cannot check the sampling frequency of the input data.\n" "Might be the case for unevenly spaced data or masked data.\n" "Please, consider using pandas.Series.resample." ) else: factor = pd.Timedelta('1min')/freq flucts = Parallel( n_jobs=n_jobs, prefer=prefer, verbose=verbose )(delayed(cls.fluctuations)( ts.values, n=int(n), deg=deg, overlap=overlap ) for n in factor*n_array) q_th_order_msq_fluc = np.fromiter( (cls.q_th_order_mean_square(fluct, q=2) for fluct in flucts), dtype='float', count=len(flucts) ) if log: q_th_order_msq_fluc = np.log(q_th_order_msq_fluc) return q_th_order_msq_fluc
[docs] @classmethod def generalized_hurst_exponent( cls, F_n, n_array, x_center=False, log=False ): r'''Generalized Hurst exponent Compute the generalized Hurst exponent, :math:`h(q)`, by fitting . Parameters ---------- F_n : array Array of fluctuations. n_array: array of int Time scales (i.e window sizes). In minutes. x_center: bool, optional If set to True, time scales are mean-centered. Default is false. log: bool, optional If set to True, assume that the input values have already been log-transformed. Default is False. Returns ------- h, _h_err: float,float Estimate of the generalized Hurst exponent and its standard error. ''' y = np.log(F_n) if not log else F_n x = np.log(n_array) if not log else n_array if x_center: x = x - np.mean(x) r = linregress(y=y, x=x) return r.slope, r.stderr
@classmethod def local_slopes(cls, F_n, n_array, s=2, log=False, verbose=False): r'''Local slopes of log(F(n)) versus log(n). The local slope of the curve log(F(n)) is calculated at each point by fitting polynomial of degree 1, using the 2*s surrounding points. At the boundaries of {F(n)}, the local slope is calculated with a reduced number of points (min. 3). Parameters ---------- F_n : array Array of fluctuations. n_array: array of int Time scales (i.e window sizes). In minutes. s: int, optional Half size of the window used to estimate the local slope. The total window size is (2*s+1). Default is 2. log: bool, optional If set to True, assume that the input values have already been log-transformed. Default is False. verbose: bool, optional If set to True, display informations about the calculations. Default is False. Returns ------- alpha_loc, alpha_loc_err, n_x: arrays of floats Local slopes (alpha_loc), and associated uncertainties, obtained for various time scales n_x. ''' # Check if inputs have the same dimension assert len(F_n) == len(n_array) # Window size: 2*s + 1 win_size = 2*s + 1 # Check if the number of points for a single linear fit is at least 3 if(win_size < 3): raise ValueError( ("Cannot perform a linear fit on series of less than" " 3 points; `s` must be greater or equal to 1.") ) # Check if the number of points to fit is less than 2*n_min if(len(n_array) < win_size): raise ValueError( ("Total number of points to fit is less than (2*s+1).") ) n_x = np.empty(len(n_array)-2*s) alpha_loc = np.empty(len(n_array)-2*s) alpha_loc_err = np.empty(len(n_array)-2*s) for t in np.arange(0, len(n_array)-win_size+1): # Fit the series of points (F(n) vs n) up to point n_x if verbose: print("-"*50) print("Fit info:") print("- t: {}".format(t)) print("- current point: {}".format(t+s)) print("- F_n[t-s:t+s]: {}".format(F_n[t:t+win_size])) print("- n_array[t-s:t+s]: {}".format(n_array[t:t+win_size])) print("- array center: {}".format(n_array[t+s])) alpha_loc[t], alpha_loc_err[t] = cls.generalized_hurst_exponent( F_n[t:t+win_size], n_array[t:t+win_size], log ) n_x[t] = n_array[t+s] if log: n_x = np.exp(n_x) return alpha_loc, alpha_loc_err, n_x
[docs] @classmethod def mfdfa(cls, ts, n_array, q_array, deg=1, overlap=False, log=False): r'''Multifractal Detrended Fluctuation Analysis function Compute the q-th order mean squared fluctuations for different segment lengths and different index q values. Parameters ---------- ts: pandas.Series Input signal. n_array: array of int Time scales (i.e window sizes). In minutes. q_array: array of float Orders of the mean squares. deg: int, optional Degree(s) of the fitting polynomials. Default is 1. overlap: bool, optional If set to True, consecutive windows during segmentation overlap by 50%. Default is False. log: bool, optional If set to True, returned values are log-transformed. Default is False. Returns ------- q_th_order_msq_fluctuations: numpy.array (n,q) array of q-th order mean squared fluctuations. ''' # Check sampling frequency freq = ts.index.freq if freq is None: raise ValueError( "Cannot check the sampling frequency of the input data.\n" "Might be the case for unevenly spaced data or masked data.\n" "Please, consider using pandas.Series.resample." ) else: factor = pd.Timedelta('1min')/freq q_th_order_msq_fluctuations = np.empty( (len(n_array), len(q_array)), dtype='float' ) for idx, n in enumerate(factor*n_array): fluct = cls.fluctuations( ts.values, n=int(n), deg=deg, overlap=overlap ) q_th_order_msq_fluctuations[idx] = [ cls.q_th_order_mean_square(fluct, q=q) for q in q_array ] if log: q_th_order_msq_fluctuations = np.log(q_th_order_msq_fluctuations) return q_th_order_msq_fluctuations
[docs] @classmethod def mfdfa_parallel( cls, ts, n_array, q_array, deg=1, overlap=False, log=False, n_jobs=2, prefer=None, verbose=0 ): r'''Multifractal Detrended Fluctuation Analysis function Compute, in parallel, the q-th order mean squared fluctuations for different segment lengths and different index q values. Parameters ---------- ts : pandas.Series Input signal. n_array: array of int Time scales (i.e window sizes). In minutes. q_array: array of float Orders of the mean squares. deg: int, optional Degree(s) of the fitting polynomials. Default is 1. overlap: bool, optional If set to True, consecutive windows during segmentation overlap by 50%. Default is False. log: bool, optional If set to True, returned values are log-transformed. Default is False. n_jobs: int, optional Number of CPU to use for parallel fitting. Default is 2. prefer: str, optional Soft hint to choose the default backend. Supported option:'processes', 'threads'. See joblib package documentation for more info. Default is None. verbose: int, optional Display a progress meter if set to a value > 0. Default is 0. Returns ------- q_th_order_msq_fluctuations: numpy.array (n,q) array of q-th order mean squared fluctuations. ''' # Check sampling frequency freq = ts.index.freq if freq is None: raise ValueError( "Cannot check the sampling frequency of the input data.\n" "Might be the case for unevenly spaced data or masked data.\n" "Please, consider using pandas.Series.resample." ) else: factor = pd.Timedelta('1min')/freq flucts = Parallel( n_jobs=n_jobs, prefer=prefer, verbose=verbose )(delayed(cls.fluctuations)( ts.values, n=int(n), deg=deg, overlap=overlap ) for n in factor*n_array) q_th_order_msq_fluctuations = np.array([ cls.q_th_order_mean_square(fluct, q=q) for fluct in flucts for q in q_array ]).reshape(len(n_array), len(q_array)) if log: q_th_order_msq_fluctuations = np.log(q_th_order_msq_fluctuations) return q_th_order_msq_fluctuations
@staticmethod def equally_spaced_logscale_range(n, start=1, stop=1440): """Equally spaced numbers in log-scale Construct a series of equally spaced numbers in log scale Parameters ---------- n: int Time scales (i.e window sizes). In minutes. start: int, optional Starting number. Default is 1. stop: int, optional End number. Default is 1440. Returns ------- n_array: numpy.array of int Array of equally spaced numbers. """ # Create series of exponentiated numbers evenly spaced in log-space. units = np.geomspace(start, stop, num=n, endpoint=True, dtype=int) # Cast as int and remove duplicates n_array = np.unique(units) # Filter out numbers < start n_array = n_array[np.where(n_array >= start)] return n_array