Source code for trendpy.samplers

# -*- coding: utf-8 -*-

# samplers.py

# MIT License

# Copyright (c) 2017 Rene Jean Corneille

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from __future__ import absolute_import

from numpy import eye, zeros, dot, array, diag, sqrt, mean

from scipy.stats import multivariate_normal, invgamma, invgauss, gamma
from numpy.linalg import inv, norm

from trendpy.globals import derivative_matrix

__all__ = ['Parameter','Parameters','Sampler','L1']

[docs]class Parameter(object): """ Implements an unknown parameter to be estimated Examples -------- We first need to import the wanted posterior distribution in `Scipy`: >>> from scipy.stats import norm and then we can instanciate parameter: >>> param1 = Parameter('lambda',norm,(1,1),0.1) """
[docs] def __init__(self, name, distribution, size, current_value=None): """ Creates a parameter to estimate in the MCMC algorithm. :param name: Name of the parameter (unique identification) :type name: string :param distribution: Posterior Probability distribution of the parameter. :type distribution: `Scipy.stats.rv_continuous` :param size: Dimension of the parameter. :type name: tuple :param current_value: Current value of the parameter :type current_value: array """ self.name = str(name) self.distribution = distribution self.size = size self.current_value = current_value
@property def current_value(self): """Parameter current value (last generated)""" return self.__current_value @current_value.setter def current_value(self, current_value): self.__current_value = current_value def __str__(self): return """ parameter name : %s parameter distribution : %s """ % (self.name, self.distribution.__str__()) def __len__(self): return 1
[docs] def is_multivariate(self): """ Checks if the parameter is univariate.""" return not self.size == (1,1)
[docs]class Parameters(object): """ Implements the set of parameters to be estimated Examples -------- We first need to import the wanted posterior distribution in `Scipy.stats`: >>> from scipy.stats import invgamma then we can create an empty parameter set and add a new parameter: >>> param1 = Parameter('sigma2',invgamma,(1,1),0.09) >>> params = Params() >>> params.append(param1) >>> print(params) """
[docs] def __init__(self, list=None, hierarchy=None): """ Creates a parameter set to estimate in the MCMC algorithm. :param list: A dictionary with the parameters to estimate :type list: dict :param hierarchy: List containing the order in which the Gibbs sampler updates the parameter values. :type hierarchy: array """ self.list = list self.hierarchy = hierarchy
@property def list(self): """ Dictionary containing the parameters to be estimated. """ return self.__list @list.setter def list(self, new_value): self.__list = new_value if new_value is not None else {} @property def hierarchy(self): """ List containing the order in which the Gibbs sampler updates the parameter values. """ return self.__hierarchy @hierarchy.setter def hierarchy(self, new_value): self.__hierarchy = new_value if new_value is not None else [] def __len__(self): return len(self.list) def __str__(self): descr = '(parameters: ----------------------- \n' descr += ', \n'.join(['name: %s, distribution: %s, size: %s' % (str(l.name), l.distribution.__str__(), l.size) for l in self.list.values()]) descr += '\n ----------------------- )' return descr def __getitem__(self, key): if isinstance(key,str): try: return self.list[key] except KeyError: print("Key %s not found in parameter set" % key) except: print("Wrong key") elif isinstance(key,int): try: return self.list[self.hierarchy[key]] except KeyError: print("Key %s not found in parameter set" % key) except IndexError: print("Index out of bounds: %s > %s" % (key,len(self.hierarchy))) else: raise TypeError("Wrong Type") def __delitem__(self,key): pass def __contains__(self, item): if isinstance(item,Parameter): try: return item.name in self.hierarchy except KeyError: print("Key %s not found in parameter set" % key) except: print("Wrong key: %s" % item.name) else: raise TypeError("Wrong Type")
[docs] def append(self, parameter): """ Adds a parameter to the parameter set. First parameter added is the first in the hierarchy. :param parameter: parameter to estimate :type parameter: trendpy.Parameter """ if not parameter.name in self.list: self.list[parameter.name] = parameter self.hierarchy.append(parameter.name)
def clear(self): """ Removes all parameters.""" self.list = None self.hierarchy = None
[docs]class Sampler(object): """ Abstract class for implementing Gibbs sampling algorithms and providing outputs.""" def __init__(self): self.parameters = None self.data = None self.options = None self.derivative_matrix = None self.parameters = None
[docs] def define_parameters(self): """ Method to set the parameter set to be updated in the MCMC algorithm. """ raise NotImplementedError("Must be overriden")
[docs] def initial_value(self,parameter_name): """ Method that sets the initial value of the parameters to be estimated. :param parameter_name: name of the parameter. :type parameter_name: str :return: initial value of the parameter :rtype: `Numpy.dnarray` """ raise NotImplementedError("Must be overriden")
[docs] def distribution_parameters(self, parameter_name): """ Method that sets the parameters of the posterior distribution of the parameters to be estimated. :param parameter_name: name of the parameter. :type parameter_name: str :return: dictionary the parameters needed to compute the next value of the Markov chain for the parameter with name: parameter_name. :rtype: dict """ raise NotImplementedError("Must be overriden")
[docs] def generate(self,parameter_name): """ This method handles the generation of the random draws of the Markov chain for each parameters. :param parameter_name: name of the parameter of interest :type parameter_name: string :return: random draw from the posterior probability distribution :rtype: `Numpy.dnarray` """ raise NotImplementedError("Must be overriden")
[docs] def output(self, simulations, burn, parameter_name): """ Computes the poserior mean of the parameters. :param simulations: history of the Markov chain simulation :type simulations: dict :param burn: number of draws dismissed as burning samples :type burn: int :param parameter_name: name of the parameter of interest :type parameter_name: string :return: output of the MCMC algorithm :rtype: `Numpy.dnarray` """ raise NotImplementedError("Must be overriden")
class Factory(object): def create(self,*args,**kwargs): return Sampler()
class L1(Sampler): def __init__(self,data,alpha=0.1,rho=0.1,total_variation_order=2): self.rho = rho self.alpha = alpha self.__data = data self.size = len(data) self.total_variation_order = total_variation_order self.derivative_matrix = derivative_matrix(self.size, self.total_variation_order) self.define_parameters() @property def data(self): return self.__data @property def parameters(self): """ List containing the parameters to estimate.""" return self.__parameters @parameters.setter def parameters(self, new_value): self.__parameters = new_value if new_value is not None else [] def define_parameters(self): params=Parameters() params.append(Parameter("trend", multivariate_normal, (self.size,1))) params.append(Parameter("sigma2", invgamma, (1,1))) params.append(Parameter("lambda2", gamma, (1,1))) params.append(Parameter("omega", invgauss, (self.size-self.total_variation_order,1))) self.parameters = params def initial_value(self,parameter_name): if parameter_name=='trend': return array([(4*i+10)/20 for i in range(self.size)]) elif parameter_name=='sigma2': return 0.8 elif parameter_name=='lambda2': return 1 elif parameter_name==str('omega'): return 0.8*array([(30*(i/2)+3)/(2*(i/2)+35) for i in range(self.size-self.total_variation_order)]) def distribution_parameters(self, parameter_name): if parameter_name=='trend': E = dot(dot(self.derivative_matrix.T,inv(diag(self.parameters.list['omega'].current_value))),self.derivative_matrix) mean = dot(inv(eye(self.size)+E),self.data) cov = (self.parameters.list['sigma2'].current_value)*inv(eye(self.size)+E) return {'mean' : mean, 'cov' : cov} elif parameter_name=='sigma2': E = dot(dot(self.derivative_matrix.T,inv(diag(self.parameters.list['omega'].current_value))),self.derivative_matrix) pos = self.size loc = 0 scale = 0.5*dot((self.data-dot(eye(self.size),self.parameters.list['trend'].current_value)).T,(self.data-dot(eye(self.size),self.parameters.list['trend'].current_value)))+0.5*dot(dot(self.parameters.list['trend'].current_value.T,E),self.parameters.list['trend'].current_value) elif parameter_name=='lambda2': pos = self.size-self.total_variation_order-1+self.alpha loc = 0.5*(norm(dot(self.derivative_matrix,self.parameters.list['trend'].current_value),ord=1))/self.parameters.list['sigma2'].current_value+self.rho scale = 1 elif parameter_name==str('omega'): pos = [sqrt(((self.parameters.list['lambda2'].current_value**2)*self.parameters.list['sigma2'].current_value)/(dj**2)) for dj in dot(self.derivative_matrix,self.parameters.list['trend'].current_value)] loc = 0 scale = self.parameters.list['lambda2'].current_value**2 return {'pos' : pos, 'loc' : loc, 'scale' : scale} def generate(self,parameter_name): distribution = self.parameters.list[parameter_name].distribution parameters = self.distribution_parameters(parameter_name) if parameter_name=='trend': return distribution.rvs(parameters['mean'],parameters['cov']) elif parameter_name=='omega': return array([1/distribution.rvs(parameters['pos'][i],loc=parameters['loc'],scale=parameters['scale']) for i in range(len(self.parameters.list['omega'].current_value))]).reshape(self.parameters.list['omega'].current_value.shape) return distribution.rvs(parameters['pos'],loc=parameters['loc'],scale=parameters['scale']) #pb with the parameter name def output(self, simulations, burn, parameter_name): out = mean(simulations[parameter_name][:,:,burn:],axis=2) return out class Factory(object): def create(self,*args,**kwargs): return L1(args[0],total_variation_order=kwargs['total_variation_order'])