Source code for pimkl.run

from .models import PIMKL
from .evaluation import roc_analysis
from .utils.preprocessing import (
    labels_to_one_hot_code_using_dict, Standardizer
)
from .data import get_learning_data_in_dict_mode, get_learning_data
import logging

logger = logging.getLogger('run_pimkl')


[docs]def fold_generator( number_of_folds, data, labels, max_per_class, transformer_class=Standardizer ): """generate class balanced splits of data and labels""" for fold in range(number_of_folds): if isinstance(data, dict): data_type_labels = list(data.keys()) if labels is None: X_train, X_test = get_learning_data_in_dict_mode( data, labels=labels, data_types=data_type_labels, max_per_class=max_per_class ) y_train = None y_test = None else: X_train, y_train, X_test, y_test = get_learning_data_in_dict_mode( # noqa data, labels=labels, data_types=data_type_labels, max_per_class=max_per_class ) for data_type in data_type_labels: # learn normalization only on train data transformer = transformer_class() X_train[data_type] = transformer.apply(X_train[data_type]) X_test[data_type] = transformer.reapply(X_test[data_type]) else: if labels is None: X_train, X_test = get_learning_data( data, labels=labels, max_per_class=max_per_class ) y_train = None y_test = None else: X_train, y_train, X_test, y_test = get_learning_data( data, labels=labels, max_per_class=max_per_class ) X_train = transformer.apply(X_train) X_test = transformer.reapply(X_test) yield { 'X_train': X_train, 'y_train': y_train, 'X_test': X_test, 'y_test': y_test, 'fold': fold }
[docs]def run_model( inducers, induction_name, mkl_name, estimator_name, mkl_parameters, estimator_parameters, induction_parameters, inducers_extended_names, fold_parameters ): """Run a single fold of the model with data splits from fold_generator. Arguments are those to PIMKL and then the inducer_names and a dict containing the fold specific arguments. In junction with partial and the fold_generator it can be used for running folds in parallel: ```list(pool.imap(run_fold, fold_generator(...)))``` """ X_train, y_train, X_test, y_test, fold = map( fold_parameters.get, ('X_train', 'y_train', 'X_test', 'y_test', 'fold') ) try: logger.debug('Training fold {}.'.format(fold)) model = PIMKL( inducers=inducers, induction=induction_name, mkl=mkl_name, estimator=estimator_name, induction_parameters=induction_parameters, mkl_parameters=mkl_parameters, estimator_parameters=estimator_parameters ) if y_train is None or y_test is None: model.fit(X_train) aucs = {'class': 0.0} weights = { name: weight for name, weight in zip(inducers_extended_names, model.kernels_weights) } trace_factors = { name: trace_factor for name, trace_factor in zip(inducers_extended_names, model.mkl_model_.trace_factors) } else: model.fit(X_train, y_train) label_to_index = { label: index for index, label in enumerate( model.mkl_model_.get_classes_order( ) if hasattr(model.mkl_model_, 'get_classes_order') else model.estimator_model_.get_classes_order() ) } index_to_label = { index: label for label, index in label_to_index.items() } y_test_one_hot_code = labels_to_one_hot_code_using_dict( y_test, label_to_index ) # prediction y_score = model.predict_proba(X_test) _, _, _, aucs = roc_analysis(y_test_one_hot_code, y_score) aucs = { index_to_label.get(index, index): value for index, value in aucs.items() } # results if len(model.kernels_weights.shape) == 2: # EasyMKL 1vRest weights = [ ( index_to_label.get(index, index), { name: weight for name, weight in zip( inducers_extended_names, class_kernels_weights ) } ) for index, class_kernels_weights in enumerate(model.kernels_weights.T) ] else: weights = { name: weight for name, weight in zip(inducers_extended_names, model.kernels_weights) } trace_factors = { name: trace_factor for name, trace_factor in zip(inducers_extended_names, model.mkl_model_.trace_factors) } except Exception as exc: print(str(exc)) logger.exception(exc) print('Problem in training for fold {}.'.format(fold)) return None return aucs, weights, trace_factors