import numpy as np
import matplotlib
import matplotlib.pylab as plt
from matplotlib.animation import FuncAnimation
from scipy import optimize
from scipy.ndimage import gaussian_filter
import scipy.stats as cpy
import numba
import emcee
[docs]class LangevinEstimation:
'''
The ``LangevinEstimation`` class includes tools to estimate a polynomial Langevin equation with various
drift and diffusion terms and provides the drift slope :math:`\zeta` as a resilience measure.
:param data: A one dimensional numpy array containing the times series to analyse.
:type data: One dimensional numpy array of floats
:param time: A one dimensional numpy array containing the time samples of the time series data.
:type time: One dimensional numpy array of floats
:param drift_model: Defines the drift model. Default is ``'3rd order polynomial'``.
Additionally, a ``'first order polynomial'`` can be chosen.
:type drift_model: str
:param diffusion_model: Defines the diffusion model. Default is ``'constant'``.
Additionally, a ``'first order polynomial'`` can be chosen.
:type diffusion_model: str
:param prior_type: Defines the used prior to calculate the posterior distribution of the data.
Default is ``'Non-informative linear and Gaussian Prior'``. A flat prior can be chosen
with ``'Flat Prior'``.
:type drift_type: str
:param prior_range: Customize the prior range of the estimated polynomial parameters :math:`\theta_i`
starting for :math:`\theta_0` in row with index 0 and upper limits in the first column.
Default is ``None`` which means prior ranges of -50 to 50 for the drift parameters and
0 to 50 for constant diffusion terms.
:type prior_range: Two-dimensional numpy array of floats, optional
:param scales: Two tailed percentiles to create credibility bands of the estimated measures.
:type scales: One-dimensional numpy array with two float entries
:param theta: Array that contains the estimated drift and diffusion parameters with drift and ending with diffusion.
The lowest order is mentioned first.
:type theta: One-dimensional numpy array of floats
:param ndim: Total number of parameters to estimate.
:type ndim: int
:param nwalkers: Number of MCMC chains.
:type nwalkers: int
:param nsteps: Length of Markov chains.
:type nsteps: int
:param nburn: Length of burn in period of the MCMC chains.
:type nburn: int
:param data_size: Length of the data array.
:type data_size: int
:param dt: Time intervall of the equidistant time array.
:type dt: float
:param drift_slope: Array with the current drift slope estimate in the 0th component.
Component 1, 2 and 3, 4 contain the upper and lower bound of the credibility intervals
defined by the ``scales`` variable.
:type drift_slope: One-dimensional numpy array of floats
:param noise_level_estimate: Array with the noise level estimate in the 0th component.
Component 1, 2 and 3, 4 contain the upper and lower bound of the credibility intervals
defined by the ``scales`` variable.
:type noise_level_estimate: One-dimenional numpy array of floats
:param loop_range: Array with the start indices of the time array for each rolled time window.
:type loop_range: One-dimensional (``data_size - window_size / window_shift + 1``) numpy array of integers
:param window_size: Size of rolling windows.
:type window_size: int
:param data_window: Array that contains data of a certain interval of the dataset in ``data``.
:type data_window: One-dimensional numpy array of floats
:param time_window: Array that contains time of a certain interval of the dataset in ``data``.
:type time_window: One-dimensional numpy array of floats
:param increments: Array that contains the increments from :math:`t` to :math:`t+1`.
:type increments: One-dimensional numpy array of floats
:param joint_samples: Array that contains drawed parameter samples from their joint posterior probability
density function that is estimated from the data in the current ``data_window``.
:type joint_samples: Two-dimensional (``ndim, num_of_joint_samples``) numpy array of floats
:param slope_estimates: Estimates of the drift slope on the current ``data_window``.
:type slope_estimates: One-dimensional numpy array of floats
:param fixed_point_estimate: Fixed point estimate calculated as the mean of the current ``data_window``.
:type fixed_point_estimate: float
:param starting_guesses: Array that contains the initial guesses of the MCMC sampling.
:type starting_guesses: Two-dimensional numpy array of floats
:param theta_array: Array that contains the drift and diffusion parameters sampled with MCMC.
:type theta_array: Two-dimensional numpy array of floats
:param slope_storage: Array that contains the drift slope estimates :math:`\hat{\zeta}`
of the rolling windows. The columns encode each rolling window with the drift
slope estimate :math:`\hat{\zeta}` in the 0th row and in rows 1,2 and 3,4
the upper and lower bound of the credibility intervals defined by ``scales``.
:type slope_storage: Two-dimensional (``5, loop_range.size``) numpy array of floats
:param noise_level_storage: Array that contains the noise level estimates :math:`\hat{\sigma}`
of the rolling windows. The columns encode each rolling window with the noise level
estimate :math:`\hat{\sigma}` in the 0th row and in rows 1,2 and 3,4
the upper and lower bound of the credibility intervals defined by ``scales``.
:type noise_level_storage: Two-dimensional (``5, loop_range.size``) numpy array of floats
:param slope_kernel_density_obj: Kernel density object of ``scipy.gaussian_kde(...)`` of the slope
posterior created with a Gaussian kernel and a bandwidth computed by silverman's rule.
:type slope_kernel_density_obj: ``scipy`` kernel density object of the ``scipy.gaussian_kde(...)`` function.
:param noise_kernel_density_obj: Kernel density object of ``scipy.gaussian_kde(...)`` of the noise level
posterior created with a Gaussian kernel and a bandwidth computed by silverman's rule.
:type noise_level_density_obj: ``scipy`` kernel density object of the ``scipy.gaussian_kde(...)`` function.
:param slow_trend: Contains the subtracted slow trend if a detrending is applied to the whole data
or each window separately.
:type slow_trend: One-dimensional numpy array of floats.
:param detrending_of_whole_dataset: Default is ``None``. If ``'linear'`` the whole ``data`` is detrended linearly. If
``Gaussian kernel smoothed`` a Gaussian smoothed curve is subtracted from the whole ``data``.
The parameters of the kernel smoothing can be set by ``gauss_filter_mode``, ``gauss_filter_sigma``,
``gauss_filter_order``, ``gauss_filter_cval`` and ``gauss_filter_truncate``.
:type detrending_of_whole_dataset: str
:param gauss_filter_mode: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_mode: str
:param gauss_filter_sigma: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_sigma: float
:param gauss_filter_order: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_order: int
:param gauss_filter_cval: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_cval: float
:param gauss_filter_truncate: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_truncate: float
:param plot_detrending: Default is ``False``. If ``True``, the ``self.data`` as well as the
``self.slow_trend`` and the detrended version are shown.
:type plot_detrending: bool
'''
def __init__(self, data, time, drift_model='3rd order polynomial', diffusion_model='constant',
prior_type='Non-informative linear and Gaussian Prior', prior_range=None, scales=None,
detrending_of_whole_dataset = None, gauss_filter_mode = 'reflect',
gauss_filter_sigma = 6, gauss_filter_order = 0, gauss_filter_cval = 0.0,
gauss_filter_truncate = 4.0, plot_detrending = False):
self.drift_model = drift_model
self.diffusion_model = diffusion_model
if drift_model == '3rd order polynomial':
self.num_drift_params = 4
elif drift_model == 'first order polynomial':
self.num_drift_params = 2
else:
print('ERROR: Drift model is not defined!')
if diffusion_model == 'constant':
self.num_diff_params = 1
elif diffusion_model == 'first order polynomial':
self.num_diff_params = 2
else:
print('ERROR: Diffusion model is not defined!')
self.ndim = self.num_drift_params + self.num_diff_params
self.nwalkers = None
self.nsteps = None
self.nburn = None
if np.all(prior_range == None) and drift_model == '3rd order polynomial' and diffusion_model == 'constant':
self.prior_range = np.array([[50., -50.], [50., -50.], [50., -50.], [50., -50.], [50., 0.]])
elif np.all(prior_range == None) and drift_model == 'first order polynomial' and diffusion_model == 'constant':
self.prior_range = np.array([[50., -50.], [50., -50.], [50., 0.]])
else:
self.prior_range = prior_range
if scales == None and prior_type == 'Non-informative linear and Gaussian Prior':
self.scales = np.array([4, 8])
else:
self.scales = scales
self.prior_type = prior_type
self.theta = np.zeros(self.ndim)
self.slow_trend = None
if detrending_of_whole_dataset == None:
self.data = data
else:
self.data, self.slow_trend = self.detrend(time, data, detrending_of_whole_dataset,
gauss_filter_mode,gauss_filter_sigma,
gauss_filter_order,gauss_filter_cval,
gauss_filter_truncate, plot_detrending)
self.data_size = data.size
self.time = time
self.dt = time[1] - time[0]
self.drift_slope = np.zeros(5)
self.noise_level_estimate = np.zeros(5)
self.loop_range = None
self.window_size = None
self.data_window = None
self.time_window = None
self.increments = None
self.joint_samples = None
self.slope_estimates = None
self.fixed_point_estimate = None
self.starting_guesses = None
self.theta_array = None
self.slope_storage = None
self.noise_level_storage = None
self.slope_kernel_density_obj = None
self.noise_kernel_density_obj = None
[docs] @staticmethod
def detrend(time, data, detrend_mode = 'linear', gauss_filter_mode = 'reflect',
gauss_filter_sigma = 6, gauss_filter_order = 0, gauss_filter_cval = 0.0,
gauss_filter_truncate = 4.0, plot_detrending = False):
"""
Method to apply a linear or Gaussian kernel smoothed detrending to the
whole data or to each data window separately.
:param time: Time points of the time series.
:type time: One-dimensional numpy array of floats.
:param data: Time series that is to be detrended.
:type data: One-dimensional numpy array of floats.
:param detrend_mode: Each window is linearly detrended for ``detrend = 'linear'`` and with a Gaussian kernel filter via ``detrend = 'gauss_kernel'``. The linear detrending is the default option.
:type detrend_mode: str
:param gauss_filter_mode: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_mode: str
:param gauss_filter_sigma: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_sigma: float
:param gauss_filter_order: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_order: int
:param gauss_filter_cval: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_cval: float
:param gauss_filter_truncate: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_truncate: float
:return: The first return object contains the detrended data, the second one the subtracted slow trend.
:rtype: One-dimensional numpy arrays.
"""
if detrend_mode == 'linear':
degree = 1
popt = np.polyfit(time, data, deg = degree)
slow_trend = popt[0] * time + popt[1]
detrended_data = data - slow_trend
elif detrend_mode == 'Gaussian kernel smoothed':
slow_trend = gaussian_filter(data, gauss_filter_sigma, order = gauss_filter_order,
output=None, mode= gauss_filter_mode, cval= gauss_filter_cval,
truncate= gauss_filter_truncate)
detrended_data = data - slow_trend
else:
print('ERROR: Type of detrending unknown.')
if plot_detrending:
plt.close()
check_detrending_figure, ax = plt.subplots()
ax.plot(time, data, label='data')
ax.plot(time, slow_trend, label='slow trend')
ax.plot(time, detrended_data, label='detrended data')
ax.legend()
plt.show()
plt.close(check_detrending_figure)
return detrended_data, slow_trend
[docs] def third_order_polynom_slope_in_fixed_point(self):
'''
Calculate the slope_estimates of a third order polynomial drift function with joint_samples of the
posterior distribution around a fixed point estimate given as the data mean of the current window.
'''
self.fixed_point_estimate = np.mean(self.data_window)
self.slope_estimates = self.joint_samples[1, :] + 2 * self.joint_samples[2,
:] * self.fixed_point_estimate + 3 * self.joint_samples[3,
:] * self.fixed_point_estimate ** 2
[docs] @staticmethod
@numba.jit(nopython=True)
def calc_D1(theta, data, drift_model):
'''
Returns the drift parameterized by a first or third order polynomial for input data.
Static function in order to speed up computation time via ``numba.jit(nopython = True)``.
'''
if drift_model == '3rd order polynomial':
return theta[0] + theta[1] * data + theta[2] * data ** 2 + theta[3] * data ** 3
elif drift_model == 'first order polynomial':
return theta[0] + theta[1] * data
[docs] def D1(self):
'''
Calls the static ``calc_D1`` function and returns its value.
'''
return self.calc_D1(self.theta, self.data_window[:-1], self.drift_model)
[docs] @staticmethod
@numba.jit(nopython=True)
def calc_D2(theta, data, diffusion_model):
'''
Returns the diffusion parameterized by a constant or first order polynomial for input data.
Static function in order to speed up computation time via ``numba.jit(nopython = True)``.
'''
if diffusion_model == 'constant':
return theta[-1] * np.ones(data.size)
elif diffusion_model == 'first order polynomial':
return theta[-2] + theta[-1] * data
[docs] def D2(self):
'''
Calls the static ``calc_D2`` function and returns its value.
'''
return self.calc_D2(self.theta, self.data_window[:-1], self.diffusion_model)
[docs] def log_prior(self):
'''
Returns the logarithmic prior probability for a given set of parameters based on the short time
propagator depending on the drift and diffusion models and the given ``prior_range`` of the
parameters ``theta``. A flat prior for the parameters apart from restriction to positive constant
diffusion and a non-informative prior for linear parts and Gaussian priors for higher orders are
implemented. The standard deviation of the Gaussian priors is given by the ``scales`` variable.
'''
if self.drift_model == '3rd order polynomial' and self.diffusion_model == 'constant':
if self.prior_type == 'Flat Prior':
if (self.prior_range[0, 0] > self.theta[0] > self.prior_range[0, 1]) and (
self.prior_range[1, 0] > self.theta[1] > self.prior_range[1, 1]) and (
self.prior_range[2, 0] > self.theta[2] > self.prior_range[2, 1]) and (
self.prior_range[3, 0] > self.theta[3] > self.prior_range[3, 1]) and (
self.prior_range[4, 0] > self.theta[4] > self.prior_range[4, 1]):
return 0
else:
return - np.inf
elif self.prior_type == 'Non-informative linear and Gaussian Prior':
if (self.prior_range[0, 0] > self.theta[0] > self.prior_range[0, 1]) and (
self.prior_range[1, 0] > self.theta[1] > self.prior_range[1, 1]) and (
self.prior_range[2, 0] > self.theta[2] > self.prior_range[2, 1]) and (
self.prior_range[3, 0] > self.theta[3] > self.prior_range[3, 1]) and (
self.prior_range[4, 0] > self.theta[4] > self.prior_range[4, 1]):
# print('Check_prior!')
return ((-3. / 2.) * np.log(1 + self.theta[1] ** 2) - np.log(self.theta[4])) + np.log(
cpy.norm.pdf(self.theta[2], loc=0, scale=self.scales[0])) + np.log(
cpy.norm.pdf(self.theta[3], loc=0, scale=self.scales[1]))
else:
return - np.inf
elif self.drift_model == 'first order polynomial' and self.diffusion_model == 'constant':
if self.prior_type == 'Flat Prior':
if (self.prior_range[0, 0] > self.theta[0] > self.prior_range[0, 1]) and (
self.prior_range[1, 0] > self.theta[1] > self.prior_range[1, 1]) and (
self.prior_range[2, 0] > self.theta[2] > self.prior_range[2, 1]):
return 0
else:
return - np.inf
elif self.prior_type == 'Non-informative linear and Gaussian Prior':
if (self.prior_range[0, 0] > self.theta[0] > self.prior_range[0, 1]) and (
self.prior_range[1, 0] > self.theta[1] > self.prior_range[1, 1]) and (
self.prior_range[2, 0] > self.theta[2] > self.prior_range[2, 1]):
return ((-3. / 2.) * np.log(1 + self.theta[1] ** 2) - np.log(self.theta[2]))
else:
return - np.inf
[docs] def log_likelihood(self):
'''
Returns the logarithmic likelihood of the data for the given model parametrization.
'''
return np.sum(
-0.5 * np.log(2 * np.pi * self.D2() ** 2 * self.dt) - (self.increments - self.D1() * self.dt) ** 2 / (
2. * self.D2() ** 2 * self.dt))
[docs] def log_posterior(self, theta):
'''
Returns the logarithmic posterior probability of the data for the given model parametrization.
'''
self.theta = theta
lg_prior = self.log_prior()
if not np.isfinite(lg_prior):
return - np.inf
return lg_prior + self.log_likelihood()
[docs] def neg_log_posterior(self, theta):
'''
Returns the negative logarithmic posterior distribution of the data
for the given model parametrization.
'''
self.theta = theta
return (-1) * self.log_posterior(theta)
[docs] def declare_MAP_starting_guesses(self, nwalkers=None, nsteps=None):
'''
Declare the maximum a posterior (MAP) starting guesses for a MCMC sampling with ``nwalkers``
and ``nsteps``.
'''
if nwalkers != None and nsteps != None:
self.nwalkers = nwalkers
self.nsteps = nsteps
if self.nwalkers == None and nwalkers == None or self.nsteps == None and nsteps == None:
print('ERROR: nwalkers and/or nsteps is not yet defined!')
res = optimize.minimize(self.neg_log_posterior, x0=np.ones(self.ndim), method='Nelder-Mead')
MAP_results = res['x']
self.starting_guesses = np.ones((self.nwalkers, self.ndim))
for i in range(self.ndim):
self.starting_guesses[:, i] = MAP_results[i] * (0.5 + np.random.rand(self.nwalkers))
if self.drift_model == '3rd order polynomial' and self.diffusion_model == 'constant':
self.starting_guesses[self.starting_guesses[:, i] > self.prior_range[i, 0], i] = self.prior_range[
i, 0] - 1
self.starting_guesses[self.starting_guesses[:, i] < self.prior_range[i, 1], i] = self.prior_range[
i, 1] + 1
[docs] def compute_posterior_samples(self, print_AC_tau, ignore_AC_error, thinning_by, print_progress):
'''
Compute the ``theta_array`` with :math:`nwalkers \cdot nsteps` Markov Chain Mone Carlo samples.
If ``ignore_AC_error = False`` the calculation will terminate with error if
the autocorrelation of the sampled chains is to high compared to the chain length.
Otherwise the highest autocorrelation length will be used to thin the sampled chains.
In order to run tests in shorter time you can set ``ignore_AC_error = True`` and
define a ``thinning_by`` n steps. If ``print_AC_tau = True`` the autocorrelation lengths of the
sampled chains is printed.
'''
print('Calculate posterior samples')
sampler = emcee.EnsembleSampler(self.nwalkers, self.ndim, self.log_posterior)
sampler.run_mcmc(self.starting_guesses, self.nsteps,
progress=print_progress) # if progress = True the remaining sample time is shown
if ignore_AC_error == False:
tau = sampler.get_autocorr_time()
elif ignore_AC_error:
tau = thinning_by
if print_AC_tau:
print('tau: ', tau)
flat_samples = sampler.get_chain(discard=self.nburn, thin=int(np.max(tau)), flat=True)
self.theta_array = np.zeros((self.ndim, flat_samples[:, 0].size))
for i in range(self.ndim):
self.theta_array[i, :] = np.transpose(flat_samples[:, i])
[docs] @staticmethod
def init():
'''
Defines the animation background for easy visualization of a rolling window scan of a given time
series. It prepares plots for the time series and the time evolution of the drift slope
estimate :math:`\hat{\zeta}` and noise level estimate :math:`\hat{\sigma}` and its probability
density estimate. The method needs to be static. Some variables have to be declared global in
to guarantee the functionality of the animation procedure.
'''
global fig, axs, sl_gr_animation, noise_gr_animation, animation_count, animation_time, animation_data # vspan_window, noise_pdf_line, drift_slope_line, noise_line, CB_slope_I, CB_slope_II, CB_noise_I, CB_noise_II
axs[0, 0].plot(animation_time, animation_data, color='C0') # , color = 'b'
axs[0, 0].set_xlim(animation_time[0], animation_time[-1])
if crit_poi != None:
axs[0, 0].axvline(crit_poi, ls=':', color='r')
axs[0, 0].set_ylim(np.min(animation_data) - 1, np.max(animation_data) + 1)
axs[0, 0].set_ylabel(r'variable $x$', fontsize=15)
axs[0, 0].set_xlabel(r'time $t$', fontsize=15)
axs[0, 1].set_xlim(noise_gr_animation[0], noise_gr_animation[-1])
axs[0, 1].set_ylim(0, 100)
if noi_le != None:
axs[0, 1].axvline(noi_le, ls=':', color='g')
axs[0, 1].set_ylabel(r'Noise Posterior $P(\hat{\theta_4})$', fontsize=15)
axs[0, 1].set_xlabel(r'noise level $\hat{\theta_4}$', fontsize=15)
axs[1, 0].set_xlim(animation_time[0], animation_time[-1])
axs[1, 0].plot(animation_time, np.zeros(animation_time.size), ls=':', color='r')
if crit_poi != None:
axs[1, 0].axvline(crit_poi, ls=':', color='r')
axs[1, 0].set_ylabel(r'max posterior slope $\zeta^{\rm max}$', fontsize=15)
axs[1, 0].set_xlabel(r'time $t$', fontsize=15)
axs[1, 1].set_xlim(animation_time[0], animation_time[-1])
axs[1, 1].set_ylim(noise_gr_animation[0], noise_gr_animation[-1])
if noi_le != None:
axs[1, 1].plot(animation_time, noi_le * np.ones(animation_time.size), ls=':', color='g')
axs[1, 1].set_ylabel(r'max posterior noise $\theta^{\rm max}_4$', fontsize=15)
axs[1, 1].set_xlabel(r'time $t$', fontsize=15)
axs[0, 0].tick_params(axis='both', which='major', labelsize=15)
axs[0, 1].tick_params(axis='both', which='major', labelsize=15)
axs[1, 0].tick_params(axis='both', which='major', labelsize=15)
axs[1, 1].tick_params(axis='both', which='major', labelsize=15)
fig.tight_layout()
vspan_window = axs[0, 0].axvspan(0, 0, alpha=0.7, color='g')
noise_pdf_line.set_data([], [])
drift_slope_line.set_data([], [])
noise_line.set_data([], [])
CB_slope_I = axs[1, 0].fill_between([], [], [], alpha=0.7, color='orange')
CB_slope_II = axs[1, 0].fill_between([], [], [], alpha=0.4, color='orange')
CB_noise_I = axs[1, 1].fill_between([], [], [], alpha=0.7, color='orange')
CB_noise_II = axs[1, 1].fill_between([], [], [], alpha=0.4, color='orange')
return vspan_window, noise_pdf_line, drift_slope_line, noise_line, CB_slope_I, CB_slope_II, CB_noise_I, CB_noise_II
# performs the animation
[docs] def animation(self, i, slope_grid, noise_grid, nwalkers, nsteps, nburn, n_joint_samples, n_slope_samples,
n_noise_samples, cred_percentiles, print_AC_tau, ignore_AC_error, thinning_by, print_progress,
detrending_per_window, gauss_filter_mode,gauss_filter_sigma, gauss_filter_order,
gauss_filter_cval, gauss_filter_truncate, plot_detrending):
'''
Function that is called iteratively by the ``matplotlib.animation.FuncAnimation(...)`` tool
to generate an animation of the rolling window scan of a time series. The time series, the rolling
windows, the time evolution of the drift slope estimate :math:`\hat{\zeta}` and the noise level
estimate :math:`\hat{\sigma}` and its posterior probality density is shown in the animation.
'''
global animation_count
self.window_shift = i
self.calc_driftslope_noise(slope_grid, noise_grid, nwalkers, nsteps, nburn, n_joint_samples, n_slope_samples,
n_noise_samples, cred_percentiles, print_AC_tau, ignore_AC_error, thinning_by,
print_progress, detrending_per_window,gauss_filter_mode,gauss_filter_sigma,
gauss_filter_order, gauss_filter_cval, gauss_filter_truncate, plot_detrending)
self.slope_storage[:, animation_count] = self.drift_slope
self.noise_level_storage[:, animation_count] = self.noise_level_estimate
axs[0, 0].collections.clear()
if i == 0:
path = vspan_window.get_xy()
path[:, 0] = [self.time_window[0], self.time_window[0], self.time_window[-1], self.time_window[-1]]
vspan_window.set_xy(path)
if i > 0:
path = vspan_window.get_xy()
path[:, 0] = [self.time_window[0], self.time_window[0], self.time_window[-1], self.time_window[-1],
self.time_window[0]]
vspan_window.set_xy(path)
noise_pdf_line.set_data(noise_gr_animation, self.noise_kernel_density_obj(noise_gr_animation))
drift_slope_line.set_data(self.time[self.window_size - 1 + self.loop_range[:animation_count + 1]],
self.slope_storage[0, :animation_count + 1])
noise_line.set_data(self.time[self.window_size - 1 + self.loop_range[:animation_count + 1]],
self.noise_level_storage[0, :animation_count + 1])
axs[1, 0].collections.clear()
axs[1, 1].collections.clear()
CB_slope_I = axs[1, 0].fill_between(self.time[self.window_size - 1 + self.loop_range[:animation_count + 1]],
self.slope_storage[1, :animation_count + 1],
self.slope_storage[2, :animation_count + 1], alpha=0.7, color='orange')
CB_slope_II = axs[1, 0].fill_between(self.time[self.window_size - 1 + self.loop_range[:animation_count + 1]],
self.slope_storage[3, :animation_count + 1],
self.slope_storage[4, :animation_count + 1], alpha=0.4, color='orange')
CB_noise_I = axs[1, 1].fill_between(self.time[self.window_size - 1 + self.loop_range[:animation_count + 1]],
self.noise_level_storage[1, :animation_count + 1],
self.noise_level_storage[2, :animation_count + 1], alpha=0.7,
color='orange')
CB_noise_II = axs[1, 1].fill_between(self.time[self.window_size - 1 + self.loop_range[:animation_count + 1]],
self.noise_level_storage[3, :animation_count + 1],
self.noise_level_storage[4, :animation_count + 1], alpha=0.4,
color='orange')
animation_count += 1
return vspan_window, noise_pdf_line, drift_slope_line, noise_line, CB_slope_I, CB_slope_II, CB_noise_I, CB_noise_II
[docs] def calc_driftslope_noise(self, slope_grid, noise_grid, nwalkers=50, nsteps=10000, nburn=200,
n_joint_samples=50000, n_slope_samples=50000, n_noise_samples=50000,
cred_percentiles=[16, 1], print_AC_tau=False,
ignore_AC_error=False, thinning_by=60, print_progress=False,
detrending_per_window = None, gauss_filter_mode = 'reflect',
gauss_filter_sigma = 6, gauss_filter_order = 0, gauss_filter_cval = 0.0,
gauss_filter_truncate = 4.0, plot_detrending = False):
'''
Calculates the drift slope estimate :math:`\hat{\zeta}` and the noise level :math:`\hat{\sigma}`
from the MCMC sampled parameters of a Langevin model for a given ``window_size`` and given ``window_shift``.
In the course of the computations the parameters ``joint_samples``, a ``noise_kernel_density_obj``,
a ``slope_kernel_density_obj``, the ``drift_slope`` with credibility intverals, the ``noise_level`` with
credibility intervals and in case of a third order polynomial drift the estimated `fixed_point_estimate`
of the window will be stored.
:param slope_grid: Array on which the drift slope kernel density estimate is evaluated.
:type slope_grid: One-dimensional numpy array of floats
:param noise_grid: Array on which the noise level kernel density estimate is evaluated.
:type noise_grid: One-dimensional numpy array of floats
:param nwalkers: Number of walkers that are initialized for the MCMC sampling via the package ``emcee``.
Default is 50.
:type nwalkers: int
:param nsteps: Length of the sampled MCMC chains. Default is 10000.
:type nsteps: int
:param nburn: Number of data points at the beginning of the Markov chains which are discarded
in terms of a burn in period. Default is 200.
:type nburn: int
:param n_joint_samples: Number of joint samples that are drawn from the estimated joint posterior
probability in order to calculate the drift slope estimate :math:`\zeta` and
corresponding credibility bands. Default is 50000.
:type n_joint_samples: int
:param n_slope_samples: Number of drift slope samples that are drawn from the estimated drift slope
posterior computed based on the sampling results of the joint posterior probability
of the Langevin parameters :math:`\theta_i`. Default is 50000.
:type n_slope_samples: int
:param n_noise_samples: Number of constant noise level samples that are drawn from the estimated
marignal probility distribution of the diffusion model is chosen constant.
Default is 50000.
:type n_noise_samples: int
:param cred_percentiles: Two entries to define the percentiles of the calculated credibility bands
of the estimated parameters. Default is ``numpy.array([16,1])``.
:type cred_percentiles: One-dimensional numpy array of integers
:param print_AC_tau: If ``True`` the estimated autocorrelation lengths of the Markov chains is shown.
The maximum length is used for thinning of the chains. Default is ``False``.
:type prind_AC_tau: bool
:param ignore_AC_error: If ``True`` the autocorrelation lengths of the Markov chains is not estimated
and thus, it is not checked whether the chains are too short to give an reliable
autocorrelation estimate. This avoids error interruption of the procedure, but
can lead to unreliable results if the chains are too short. The option should
be chosen in order to save computation time, e.g. when debugging your code
with short Markov chains. Default is ``False``.
:type ignore_AC_error: bool
:param thinning_by: If ``ignore_AC_error = True`` the chains are thinned by ``thinning_by``. Every
``thinning_by``-th data point is stored. Default is 60.
:type thinning_by: int
:param print_progress: If ``True`` the progress of the MCMC sampling is shown. Default is ``False``.
:type print_progress: bool
:param detrending_per_window: Default is ``None``. If ``'linear'``, the ``self.data_window`` is detrended linearly. If
``Gaussian kernel smoothed``, a Gaussian smoothed curve is subtracted from ``self.data_window``.
The parameters of the kernel smoothing can be set by ``gauss_filter_mode``, ``gauss_filter_sigma``,
``gauss_filter_order``, ``gauss_filter_cval`` and ``gauss_filter_truncate``. Drift slope and
noise level are estimated after detrending in these cases.
:type detrending_per_window: str
:param gauss_filter_mode: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_mode: str
:param gauss_filter_sigma: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_sigma: float
:param gauss_filter_order: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_order: int
:param gauss_filter_cval: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_cval: float
:param gauss_filter_truncate: According to the ``scipy.ndimage.filters.gaussian_filter`` option.
:type gauss_filter_truncate: float
:param plot_detrending: Default is ``False``. If ``True``, the ``self.data`` as well as the
``self.slow_trend`` and the detrended version are shown.
:type plot_detrending: bool
'''
cred_percentiles = np.array(cred_percentiles)
self.data_window = np.roll(self.data, shift=- self.window_shift)[:self.window_size]
self.time_window = np.roll(self.time, shift=- self.window_shift)[:self.window_size]
if detrending_per_window != None:
self.data_window, self.slow_trend = self.detrend(self.time_window, self.data_window,
detrending_per_window,gauss_filter_mode,
gauss_filter_sigma,gauss_filter_order,
gauss_filter_cval, gauss_filter_truncate,
plot_detrending)
self.increments = self.data_window[1:] - self.data_window[:-1]
self.declare_MAP_starting_guesses()
self.compute_posterior_samples(print_AC_tau=print_AC_tau, ignore_AC_error=ignore_AC_error,
thinning_by=thinning_by, print_progress=print_progress)
if self.drift_model == '3rd order polynomial':
self.joint_kernel_density_obj = cpy.gaussian_kde(self.theta_array, bw_method='silverman')
self.joint_samples = self.joint_kernel_density_obj.resample(size=n_joint_samples)
self.third_order_polynom_slope_in_fixed_point()
self.noise_kernel_density_obj = cpy.gaussian_kde(self.theta_array[4, :], bw_method='silverman')
elif self.drift_model == 'first order polynomial':
self.slope_estimates = self.theta_array[:, 1]
self.noise_kernel_density_obj = cpy.gaussian_kde(self.theta_array[2, :], bw_method='silverman')
self.slope_kernel_density_obj = cpy.gaussian_kde(self.slope_estimates, bw_method='silverman')
slope_samples = self.slope_kernel_density_obj.resample(size=n_slope_samples)
max_slope = slope_grid[
self.slope_kernel_density_obj(slope_grid) == np.max(self.slope_kernel_density_obj(slope_grid))]
if max_slope.size == 1:
self.drift_slope[0] = max_slope
else:
self.drift_slope[0] = max_slope[0]
slope_credibility_percentiles = np.percentile(slope_samples, [cred_percentiles[0], 100 - cred_percentiles[0]])
self.drift_slope[1] = slope_credibility_percentiles[0]
self.drift_slope[2] = slope_credibility_percentiles[1]
slope_credibility_percentiles = np.percentile(slope_samples, [cred_percentiles[1], 100 - cred_percentiles[1]])
self.drift_slope[3] = slope_credibility_percentiles[0]
self.drift_slope[4] = slope_credibility_percentiles[1]
noise_samples = self.noise_kernel_density_obj.resample(size=n_noise_samples)
noise_level = noise_grid[
self.noise_kernel_density_obj(noise_grid) == np.max(self.noise_kernel_density_obj(noise_grid))]
if noise_level.size == 1:
self.noise_level_estimate[0] = noise_level
else:
self.noise_level_estimate[0] = noise_level[0]
noise_credibility_percentiles = np.percentile(noise_samples, [cred_percentiles[0], 100 - cred_percentiles[0]])
self.noise_level_estimate[1] = noise_credibility_percentiles[0]
self.noise_level_estimate[2] = noise_credibility_percentiles[1]
noise_credibility_percentiles = np.percentile(noise_samples, [cred_percentiles[1], 100 - cred_percentiles[1]])
self.noise_level_estimate[3] = noise_credibility_percentiles[0]
self.noise_level_estimate[4] = noise_credibility_percentiles[1]