"""
Provide a simple, easy to use model for fitting data and especially
distributions. The model is capable of having "components", which can
be defined and fitted individually.
"""
from functools import reduce
import numpy as np
import dashi as d
import pylab as p
import iminuit
import types
import functools
import inspect
from copy import deepcopy as copy
import scipy.optimize as optimize
import seaborn as sb
from . import functions as funcs
from .. import utils as u
Logger = u.Logger
default_color = "r"
try:
default_color = sb.color_palette("dark")[2]
except Exception as e:
Logger.warn("Can not use seaborn dark colorpalette for setting the default color! Exception thrown {}".\
format(e))
[docs]def copy_func(f):
"""
Based on http://stackoverflow.com/a/6528148/190597 (Glenn Maynard)
Basically recreate the function f independently.
Args:
f (callable): the function f will be cloned
"""
g = types.FunctionType(f.__code__, f.__globals__, name=f.__name__,
argdefs=f.__defaults__,
closure=f.__closure__)
g = functools.update_wrapper(g, f)
g.__kwdefaults__ = f.__kwdefaults__
return g
[docs]def concat_functions(fncs):
"""
Inspect functions and construct a new one which returns the added result.
concat_functions(A(x, apars), B(x, bpars)) -> C(x, apars,bpars)
C(x, apars, bpars) returns (A(x, apars) + B(x, bpars))
Arguments:
fncs (list):
The callables to concat
Returns:
tuple (callable, list(pars))
"""
datapar = inspect.getfullargspec(fncs[0]).args[0]
pars = [inspect.getfullargspec(f).args[1:] for f in fncs]
npars = [len(prs) for prs in pars]
renamed_pars = [[k + str(i) for k in ipars] for i,ipars in enumerate(pars)]
joint_pars = reduce( lambda x,y : x + y, renamed_pars)
slices = []
lastslice = 0
for i, pr in enumerate(npars):
thisslice = slice(lastslice, npars[i] + lastslice)
slices.append(thisslice)
lastslice += npars[i]
globals().update({"concat_functions_fncs":fncs,\
"concat_functions_slices" : slices,\
"concat_functions_joint_pars" : joint_pars,\
"concat_functions_datapar" : datapar})
TEMPLATE = """def jointfunc({}):
data = concat_functions_fncs[0](*([{}] + ([{}][concat_functions_slices[0]])))
for i,fn in enumerate(concat_functions_fncs[1:]):
data += fn(*([{}] + ([{}][concat_functions_slices[1:][i]])))
return data
""".format(datapar + "," + ",".join(joint_pars),datapar, ",".join(joint_pars),datapar, ",".join(joint_pars))
exec (TEMPLATE, globals())
return jointfunc, joint_pars
[docs]def construct_efunc(x, data, jointfunc, joint_pars):
"""
Construct a least-squares error function. This function will then
be minimized, e.g. with the help of minuit.
Args:
x (np.ndarray): The x-values the fit should be evaluated on
data: (np.ndarray): The y-values of the data we want to describe
jointfunc: (callable): The full data model with all components
joint_pars: (tuple): The model parameters
Returns:
callable
"""
# ensure that "x" is the first parameter, even if it
# has a different name
datapar = inspect.getfullargspec(jointfunc).args[0]
globals().update({"jointfunc" : jointfunc,\
"{}".format(datapar) : x,\
"data" : data})
EFUNC = """def efunc({}):
return ((abs(data - jointfunc({}))**2).sum())""".format(",".join(joint_pars), datapar + "," + ",".join(joint_pars))
exec (EFUNC, globals())
return efunc
[docs]def create_minuit_pardict(fn, startparams, errors, limits, errordef):
"""
Construct a dictionary for minuit fitting. This dictionary contains information
for the minuit fitter like startparams or limits.
Args:
fn (callable): The function for which
startparams (tuple): A list of startparameter. One each per parameter
errors (list): ?
limits (list(tuple)): A list of (min, max) tuples for each parameter, can be None
errordef (float): The errordef should be 1 for a least square fit
(for what this all is constructed for) or 0.5 in
case of a likelihood fit
Returns:
dict
"""
parnames = inspect.getfullargspec(fn).args
mindict = dict()
for i,k in enumerate(parnames):
mindict[k] = startparams[i]
if not errors is None: mindict["error_" + k] = errors[i]
if not limits is None: mindict["limit_" + k] = (limits[i][0], limits[i][1])
mindict["errordef"] = errordef
return mindict
[docs]class Model(object):
"""
Describe data with a prediction. The Model class allows to set a function
for data prediction, and fit it to the data by the means of a chi2 fit.
It is possible to use a collection of functions to describe a complex model,
e.g Gaussian + some exponential tail.
The individual models can be fitted independently, which results in sum_i n_i de
degrees of freedom for i models with n_i parameters each, or alternatively they c
can be coupled and share parameters, which results in sum_i n_i - n_ij degrees of
freedom where n_ij is a shared parameters.
"""
def __init__(self, func,\
startparams=None,\
limits=((-np.inf, np.inf),),\
errors=(10.,),
func_norm=1):
"""
Args:
func (fnc): The function which shall model the data.
It has to be of the form f(x, par1, par2, ...).
Only 1d fits are supported, and "x" must be the
first argument.
Keyword Args:
Will be passed to iminuit:
startparams (tuple): A set of startparameters. 1 start parameter
per function parameter. A good choice of
start parameters helps the fit a lot.
limits (tuple): individual limit min/max for each parameter
1 tuple (min/max) per parameter
errors (tuple): One value per parameter, giving an 1sigma error
estimate
Additional keywords:
func_norm (float): multiply the result of func when evaluated with norm
"""
# if no startparams are given, construct
# and initialize with 0es.
# FIXME: better estimate?
if startparams is None:
startparams = [0]*(func.__code__.co_argcount - 1) # first argument is not a free parameter.
#def normed_func(*args):
# return func_norm*func(*args)
self._callbacks = [copy_func(func)]
self.startparams = list(startparams)
#self.errors = [len(startparams)]
self.errors = None
self.n_params = [len(startparams)]
self.best_fit_params = list(startparams)
self.coupling_variable = []
self.all_coupled = False
self.data = None
self.data_errs = None
self.xs = None
self.chi2_ndf = None
self.chi2_ndf_components = []
self.norm = 1
self.ndf = 1
self.func_norm = [float(func_norm)]
self.prediction = lambda xs: reduce(lambda x, y: x + y,\
[self.func_norm[i]*f(xs) for i,f in enumerate(self.components)])
self.first_guess = None
self._distribution = None
self._is_categorical = False
self.covariance_matrix = None
@property
def distribution(self):
return self._distribution
[docs] def set_distribution(self, distr):
"""
Adding a distribution to the model. The distribution shall contain
the data we want to model.
Args:
distr (dashi.histogram):
Returns:
None
"""
self._distribution = distr
#def construct_minuit_function(self):
# func_args, coupling = self.extract_parameters()
# parameter_set = ["a","b","c","d","e","f","g"]
# if self.all_coupled:
# func_args = func_args[0]
# # x is always 0
# fnc_src = """def to_minimize({}):\n\t
# """
# #def to_minimize()
@property
def n_free_params(self):
"""
The number of free parameters of this model. The free
parameter in a least square fit are
number of data points - fit parameters.
Returns:
int
"""
return sum(self.n_free_params)
def _create_distribution(self, data, bins,\
normalize=False, density=True):
"""
Create a distribution out of the data with the help of
dashi.
Args:
data (np.ndarray): input data
bins (np.ndarray or int): bins to forward to the histogram creation
Keyword Args:
normalize (bool): normalize the resulting distribution by nentries
density (bool): if together with normalize, then normalize the
distribution by area, as for a pdf for example.
The area under the curve will then be unity.
Returns:
None
"""
if np.isscalar(bins):
bins = np.linspace(min(data), max(data), bins)
h = d.factory.hist1d(data, bins)
if normalize:
h_norm = h.normalized(density=density)
norm = h.bincontent / h_norm.bincontent
norm = norm[np.isfinite(norm)][0]
self.norm = norm
self.set_distribution(h_norm)
else:
self.norm = 1
self.set_distribution(h)
self.data = self.distribution.bincontent
self.xs = self.distribution.bincenters
[docs] def add_first_guess(self, func):
"""
Use func to estimate better startparameters for the initialization
of the fit.
Args:
func (callable): The function func has to have the same amount
of parameters as we have startparameters.
Returns:
None
"""
assert self.all_coupled or len(self.n_params) == 1, "Does not work yet if not all variables are coupled"
self.first_guess = func
[docs] def eval_first_guess(self, data):
"""
Assign a new set of start parameters obtained by calling the
first geuss metthod
Args:
data (np.ndarray): input data, used to evaluate the first guess method.
Returns:
None
"""
assert self.first_guess is not None, "No first guess method provided! Run Model.add_first_guess "
newstartparams = list(self.first_guess(data))
assert len(newstartparams) == len(self.startparams), "first guess algorithm must yield startparams!"
self.startparams = newstartparams
[docs] def couple_models(self, coupling_variable):
"""
Couple the models by a variable, which means use the variable not
independently in all model components, but fit it only once.
E.g. if there are 3 models with parameters p1, p2, k each and they
are coupled by k, parameters p11, p21, p12, p22, and k will be fitted
instead of p11, p12, k1, p21, p22, k2.
Args:
coupling_variable: variable number of the number in startparams.
This *must* be the index to the respective tuple.
Returns:
None
"""
assert len(np.unique(self.n_params)) == 1,\
"Models have different numbers of parameters,difficult to couple!"
self.coupling_variable.append(coupling_variable)
[docs] def construct_error_function(self, startparams, errors, limits, errordef):
"""
Construct the error function together with the necessary parameters
for minuit.
Args:
startparams (tuple): A set of startparameters. 1 start parameter
per function parameter. A good choice of
start parameters helps the fit a lot.
limits (tuple): individual limit min/max for each parameter
1 tuple (min/max) per parameter
errors (tuple): One value per parameter, giving an 1sigma error
estimate
errordef (float): The errordef should be 1 for a least square fit
(for what this all is constructed for) or 0.5 in
case of a likelihood fit
Returns:
tuple (callable, dict)
"""
concated, concated_pars = concat_functions(self._callbacks)
error_func = construct_efunc(self.xs, self.data, concated, concated_pars)
pardict = create_minuit_pardict(error_func, startparams, errors, limits, errordef)
return error_func, pardict
[docs] def couple_all_models(self):
"""
"Lock" the model after all components have been added. This
will determiine a set of startparameters.
After this, no other models can be coupled/added any more.
Returns:
None
"""
self.all_coupled = True
# if self.all_coupled:
self.startparams = self.startparams[0:self.n_params[0]]
def __add__(self, other):
"""
Add another component to the model. This allows for more
complex functions like gaussian + exponential.
Args:
other (Model): A full flexed model which has to be
initialized with its own set of
startparameters
Returns:
Model : A multi-compoment Model
"""
newmodel = Model(lambda x :x)
# hack the new model to be like self
newmodel._callbacks = self._callbacks + other._callbacks
newmodel.startparams = self.startparams + other.startparams
newmodel.n_params = self.n_params + other.n_params
newmodel.best_fit_params = newmodel.startparams
newmodel.func_norm = self.func_norm + other.func_norm
return newmodel
@property
def components(self):
lastslice = 0
thecomponents = []
for i, cmp in enumerate(self._callbacks):
thisslice = slice(lastslice, self.n_params[i] + lastslice)
tmpcmp = copy(cmp) # hack - otherwise it will not work :\
lastslice += self.n_params[i]
best_fit = self.best_fit_params[thisslice]
if self.all_coupled:
best_fit = self.best_fit_params[0:self.n_params[0]]
yield lambda xs: self.func_norm[i]*tmpcmp(xs, *best_fit)
def __call__(self, xs, *params):
"""
Return the model prediction
Args:
xs (np.ndaarray): the values the model should be evaluated on
Returns:
np.ndarray
"""
thecomponents = []
firstparams = params[0:self.n_params[0]]
first = self.func_norm[0]*self._callbacks[0](xs, *firstparams)
lastslice = self.n_params[0]
for i, cmp in enumerate(self._callbacks[1:]):
thisslice = slice(lastslice, self.n_params[1:][i] + lastslice)
# tmpcmp = copy(cmp)
theparams = list(params[thisslice])
if self.coupling_variable:
for k in self.coupling_variable:
theparams[k] = firstparams[k]
elif self.all_coupled:
theparams = firstparams
# thecomponents.append(lambda xs: tmpcmp(xs, *params[thisslice]))
lastslice += self.n_params[1:][i]
first += self.func_norm[i]*cmp(xs, *theparams)
return first
[docs] def add_data(self, data,\
data_errs=None,\
bins=200,\
create_distribution=False,\
normalize=False,\
density=True,\
xs=None,\
subtract=None):
"""
Add some data to the model, in preparation for the fit. There are two
modes of this:
1) Data needs to be histogrammed, then make sure to set
'nbins' appropriatly and set the 'create_distribution'
2) Data needs NOT to be histogrammed. In that case, bins has no meaning
For a meaningful calculation of chi2, the errors of the data points
need to be given to data_errs
Args:
data (np.array) : input data
Keyword Args
data_errs (np.array) : errors of the data for chi2 calculation
(only used when not histogramming)
nbins (int/np.array) : number of bins or bin array to be passed
to the histogramming routine
create_distribution (bool) : data requires the creation of a histogram
first before fitting
subtract (callable) : ?
normalize (bool) : normalize the data before adding
density (bool) : if normalized, assume the data is a pdf.
if False, use bincount for normalization.
Returns:
None
"""
if np.isscalar(bins):
nbins = bins
else:
nbins = len(bins)
if create_distribution:
self._create_distribution(data, bins, normalize, density=density)
self._is_categorical = True
self.ndf = nbins - len(self.startparams)
else:
assert xs is not None, "Have to give xs if not histogramming!"
if data_errs is not None:
assert len(data_errs) == len(data), "Data and associated errrors must have the same size"
else:
data_errs = np.ones(len(data))
self.data = data
self.data_errs = data_errs
self.xs = xs
self.ndf = len(data) - len(self.startparams)
self.bins = None
if subtract is not None:
self.data -= subtract(self.xs)
[docs] def fit_to_data(self, silent=False,\
use_minuit=True,\
errors=None,\
limits=None,\
errordef=1,\
debug_minuit=False,\
**kwargs):
"""
Apply this model to data. This will perform the fit with the help
of either minuit or scipy.optimize.
Args:
data (np.ndarray) : the data, unbinned
silent (bool) : silence output
use_minuit (bool) : use minuit for fitting
errors (list) : errors for minuit, see miniuit manual
limits (list of tuples): limits for minuit, see minuit manual
errordef (float) : typically 1 for chi2 fit and 0.5 for llh fit
: this class is currently set up as a leeast square
fit, so this should not be changed
debug_minuit (int) : if True, attache the iminuit instance to the model
so that it can be inspected later on. Will raise error
if use_minuit is set to False at the same time
**kwargs: will be passed on to scipy.optimize.curvefit
Returns:
None
"""
if not use_minuit and debug_minuit:
raise ValueError("You can not debug minuit when you are not using it!")
startparams = self.startparams
if not silent: print("Using start params...", startparams)
fitkwargs = {"maxfev": 1000000, "xtol": 1e-10, "ftol": 1e-10}
if "bounds" in kwargs:
fitkwargs.pop("maxfev")
# this is a matplotlib quirk
fitkwargs["max_nfev"] = 1000000
fitkwargs.update(kwargs)
if use_minuit:
errorfunc, params = self.construct_error_function(self.startparams,\
errors,\
limits,\
errordef)
# for debugging reasons we attach the minuit instance
# to the model if desired
m = iminuit.Minuit(errorfunc, **params)
m.migrad()
values = m.values
if not silent: print (values, "result")
parameters=[]
for k in sorted(m.var2pos, key=m.var2pos.get):
if not silent : print (k)
parameters.append(m.values[k])
self.errors = m.errors
self.covariance_matrix = m.covariance
else:
parameters, covariance_matrix = optimize.curve_fit(self, self.xs,\
self.data, p0=startparams,\
# bounds=(np.array([0, 0, 0, 0, 0] + [0]*len(start_params[5:])),\
# np.array([np.inf, np.inf, np.inf, np.inf, np.inf] +\
# [np.inf]*len(start_params[5:]))),\
# max_nfev=100000)
# method="lm",\
**fitkwargs)
self.covariance_matrix = covariance_matrix
self.errors = []
for i, row in enumerate(self.covariance_matrix):
for j,entry in enumerate(row):
if i == j:
self.errors.append(np.sqrt(entry))
if not silent: print("Fit yielded parameters", parameters)
if (not silent) and (not use_minuit): print("{:4.2f} NANs in covariance matrix".format(len(self.covariance_matrix[np.isnan(np.asarray(covariance_matrix))])))
# simple GOF
#norm = 1
#if normalize:
# norm = h.bincontent / h_norm.bincontent
# norm = norm[np.isfinite(norm)][0]
#self.norm = norm
if self._is_categorical:
chi2 = (funcs.calculate_chi_square(self.norm*self.data, self.norm * self(self.xs, *parameters)))
else:
# calculate the chi2
chi2 = (funcs.calculate_reduced_chi_square(self.norm*self.data, self.norm * self(self.xs, *parameters), self.data_errs))
self.chi2_ndf = chi2/self.ndf
# FIXME: new feature
#for cmp in self.components:
# thischi2 = (calculate_chi_square(h.bincontent, norm * cmp(h.bincenters)))
# self.chi2_ndf_components.append(thischi2/nbins)
if not silent and use_minuit: print("Function value at minimum {:4.2e}".format(m.fval))
if not silent: print("Obtained chi2 : {:4.2f}; ndf : {:4.2f}; chi2/ndf {:4.2f}".format(chi2, self.ndf, self.chi2_ndf))
if not silent: print("##########################################")
self.best_fit_params = parameters
if debug_minuit:
self._m_instance = m
return parameters
[docs] def get_minuit_instance(self):
"""
If a previous fit has been done with the debug_minuit instance
then it now can be accessed.
"""
if not hasattr(self, '_m_instance'):
Logger.warn('Minuit instance not available. Execute `fit_to_data` with `debug_minuit` set to True')
return None
else:
return self._m_instance
[docs] def clear(self):
"""
Reset the model. This bascially deletes all components and
resets the startparameters.
Returns:
None
"""
self.__init__(self._callbacks[0], self.startparams[:self.n_params[0]])
[docs] def plot_result(self, ymin=1000, xmax=8,\
ylabel="normed bincount",\
xlabel="Q [C]", fig=None,\
log=True,\
figure_factory=None,\
axes_range="auto",
model_alpha=.3,\
add_parameter_text=((r"$\mu_{{SPE}}$& {:4.2e}\\",0),),
histostyle="scatter",
datacolor="k",
modelcolor=default_color):
"""
Show the fit result, together with the fitted data.
Args:
ymin (float): limit the yrange to ymin
xmax (float): limit the xrange to xmax
model_alpha (float): 0 <= x <= 1 the alpha value of the lineplot
for the model
ylabel (str): label for yaxis
log (bool): plot in log scale
figure_factory (fnc): Use to generate the figure
axes_range (str): the "field of view" to show
fig (pylab.figure): A figure instance
add_parameter_text (tuple): Display a parameter in the table on the plot
((text, parameter_number), (text, parameter_number),...)
datacolor (str): color for the data points
modelcolor (str): color for the model prediction
Returns:
pylab.figure
"""
assert self.chi2_ndf is not None, "Needs to be fitted first before plotting!"
def auto_adjust_limits(ax, data, xs):
scalemax, scalemin = 1.1, 0.9
ymax, ymin = scalemax*max(data), scalemin*min(data)
xmax, xmin = scalemax*max(xs[abs(data) > 0]), scalemin*min(xs[abs(data) >0])
ax.set_xlim(xmin, xmax)
ax.set_ylim(ymin, ymax)
return ax
if figure_factory is not None:
fig = figure_factory()
elif fig is None:
fig = p.figure()
ax = fig.gca()
if self.distribution is not None:
self.distribution.__getattribute__(histostyle)(color=datacolor)
else:
ax.plot(self.xs, self.data, color=datacolor)
infotext = r"\begin{tabular}{ll}"
ax.plot(self.xs, self.prediction(self.xs), color=default_color, alpha=model_alpha)
for comp in self.components:
ax.plot(self.xs, comp(self.xs), linestyle=":", lw=1, color="k")
infotext += r"$\chi^2/ndf$ & {:4.2f}\\".format(self.chi2_ndf)
ax.set_ylim(ymin=ymin)
ax.set_xlim(xmax=xmax)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
if axes_range == "auto":
ax = auto_adjust_limits(ax, self.data, self.xs)
if self.distribution is not None:
infotext += r"entries& {}\\".format(self.distribution.stats.nentries)
if add_parameter_text is not None:
for partext in add_parameter_text:
infotext += partext[0].format(self.best_fit_params[partext[1]])
#infotext += r"$\mu_{{SPE}}$& {:4.2e}\\".format(self.best_fit_params[mu_spe_is_par])
infotext += r"\end{tabular}"
ax.text(0.9, 0.9, infotext,
horizontalalignment='center',
verticalalignment='center',
transform=ax.transAxes)
if log: ax.set_yscale("symlog", linthreshy=1)
#sb.despine()
return fig