"""Pathway Induced Multiple Kernel Learning."""
import logging
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from ..factories import MKL_FACTORY, ESTIMATOR_FACTORY, INDUCTION_FACTORY
from ..utils.objects import is_sequence, is_sequence_of_sequence
logger = logging.getLogger(__name__)
def _update_kernels(
kernels, lhs, rhs, induction, inducer, induction_parameters
):
kernel = induction(lhs, rhs, inducer, **induction_parameters)
kernels.append(kernel)
def _update_kernels_from_inducers(
inducers, kernels, lhs, rhs, induction, induction_parameters
):
# optimize computation of the kernels
lhs = np.array(lhs, order='F', dtype=np.float64)
rhs = np.array(rhs, order='F', dtype=np.float64)
for inducer in inducers:
_update_kernels(
kernels, lhs, rhs, induction, inducer, induction_parameters
)
def _update_kernels_multiple_data(
inducers, kernels, lhs, rhs, induction, induction_parameters
):
dict_mode = (
isinstance(lhs, dict) and isinstance(rhs, dict)
and isinstance(inducers, dict)
)
if dict_mode:
for key in lhs:
a_lhs = lhs[key]
a_rhs = rhs[key]
corresponding_inducers = inducers[key]
_update_kernels_from_inducers(
corresponding_inducers, kernels, a_lhs, a_rhs, induction,
induction_parameters
)
else:
for a_lhs, a_rhs in zip(lhs, rhs):
_update_kernels_from_inducers(
inducers, kernels, a_lhs, a_rhs, induction,
induction_parameters
)
[docs]class PIMKL(BaseEstimator, ClassifierMixin):
"""Pathway Induced Multiple Kernel Learning
with choice of MKL and estimator algorithm.
Estimator is only trained when MKL is not an estimator itself."""
def __init__(
self,
inducers,
induction='induce_linear_kernel',
mkl='UMKLKNN',
estimator='EasyMKL',
induction_parameters={},
mkl_parameters={
'k': 5,
'epsilon': 0.0001,
'maxiter_qp': 100000,
'kernel_normalization': True,
'precompute': True
},
estimator_parameters={
'lam': 0.2,
'epsilon': 1e-5,
'regularization_factor': False,
'kernel_normalization': False,
'precompute': True
}
):
"""Instantiate a PIMKL object."""
self.inducers = inducers
self.induction = induction
self.mkl = mkl
self.estimator = estimator
self.induction_parameters = induction_parameters
self.mkl_parameters = mkl_parameters
self.estimator_parameters = estimator_parameters
[docs] def get_params(self, deep=True):
"""Get model parameters."""
return {
'inducers': self.inducers,
'induction': self.induction,
'mkl': self.mkl,
'estimator': self.estimator,
'induction_parameters': self.induction_parameters,
'mkl_parameters': self.mkl_parameters,
'estimator_parameters': self.estimator_parameters
}
[docs] def set_params(self, **parameters):
"""Set model parameters."""
for parameter, value in parameters.items():
setattr(self, parameter, value)
return self
[docs] def set_mkl_params(self, **parameters):
"""Set model parameters."""
for parameter, value in parameters.items():
self.mkl_parameters[parameter] = value
[docs] def set_estimator_params(self, **parameters):
"""Set model parameters."""
for parameter, value in parameters.items():
self.estimator_parameters[parameter] = value
[docs] def fit(self, X, y=None):
"""Fit the model.
Estimator is only trained when MKL is not an estimator."""
logger.debug('PIMKL.fit() start')
self.mkl_model_ = MKL_FACTORY[self.mkl](**self.mkl_parameters)
self.lhs_ = X
self.y_ = y
# prepare the kernels
kernels = self._get_kernels(self.lhs_)
# fit mkl
self.mkl_model_.fit(kernels, self.y_)
self.kernels_weights = self.mkl_model_.kernels_weights
# in case of single binary problem, ensure kernels_weights is 1D
try:
binary_problems = self.kernels_weights.shape[1]
if binary_problems == 1:
self.kernels_weights = self.kernels_weights[:, 0]
except IndexError:
pass
# when mkl is not an estimator, fit estimator
if hasattr(self.mkl_model_, 'predict_proba'):
self.estimator_model_ = None
logger.debug('PIMKL.fit() done, is estimator and fitted already')
return self
if self.y_ is not None:
logger.debug('train given estimator')
self.estimator_parameters['trace_normalization'] = False
self.estimator_parameters['precompute'] = True
self.estimator_model_ = ESTIMATOR_FACTORY[self.estimator](
**self.estimator_parameters
)
self.estimator_model_.fit(
[self.mkl_model_.get_optimal_kernel()], self.y_
)
logger.debug('given estimator done')
else:
self.estimator_model_ = None
logger.debug('PIMKL.fit() done')
return self
[docs] def predict(self, X):
"""
Predict using trained model.
It returns the optimal kernel using learned weights or,
in case labels were fitted in training, the predicted labels.
"""
# prepare the kernels
kernels = self._get_kernels(self.lhs_, X)
# predict
try:
return np.argmax(self.mkl_model_.predict_proba(kernels), axis=1)
except AttributeError:
if self.estimator_model_ is not None:
return self.estimator_model_.predict(
[self.mkl_model_.predict(kernels)]
)
else:
return self.mkl_model_.predict(kernels)
[docs] def predict_proba(self, X):
"""Predict probabilities using trained model."""
# predict
if hasattr(self.mkl_model_, 'predict_proba'):
kernels = self._get_kernels(self.lhs_, X)
return self.mkl_model_.predict_proba(kernels)
if self.estimator_model_ is not None:
# prepare the kernels
kernels = self._get_kernels(self.lhs_, X)
# predict probabilities
return self.estimator_model_.predict_proba(
[self.mkl_model_.predict(kernels)]
)
else:
raise RuntimeError(
'predict_proba valid only if trained passing labels'
)
def _get_kernels(self, lhs, rhs=None):
logger.debug('_get_kernels() start')
if rhs is None:
rhs = lhs
kernels = []
multiple_inductions = is_sequence(self.induction)
multiple_induction_parameters = is_sequence_of_sequence(
self.induction_parameters
)
multiple_data = is_sequence(lhs) and is_sequence(rhs)
if multiple_data:
if len(lhs) != len(rhs):
raise ValueError(
'Mismatch in lenght of lhs:{} and rhs:{}'.format(
len(lhs), len(rhs)
)
)
if multiple_induction_parameters:
# TODO given a set of different induction_parameters to be used:
raise NotImplementedError
# prepare the combined kernels
if multiple_inductions and multiple_data:
for induction in self.induction:
_update_kernels_multiple_data(
self.inducers, kernels, lhs, rhs,
INDUCTION_FACTORY[induction], self.induction_parameters
)
elif multiple_data:
_update_kernels_multiple_data(
self.inducers, kernels, lhs, rhs,
INDUCTION_FACTORY[self.induction], self.induction_parameters
)
elif multiple_inductions:
for induction in self.induction:
_update_kernels_from_inducers(
self.inducers, kernels, lhs, rhs,
INDUCTION_FACTORY[induction], self.induction_parameters
)
else:
_update_kernels_from_inducers(
self.inducers, kernels, lhs, rhs,
INDUCTION_FACTORY[self.induction], self.induction_parameters
)
logger.debug('_get_kernels() done')
return kernels
# Pickling support
def __getstate__(self):
state = self.__dict__.copy()
# Remove the unpicklable entries.
state.pop('mkl_model_', None)
state.pop('estimator_model_', None)
return state
def __setstate__(self, state):
# Restore instance attributes.
self.__dict__.update(state)
# Restore the previously models state.
if hasattr(self, 'y_') and hasattr(self, 'lhs_'):
self.fit(self.lhs_, self.y_)