from __future__ import division, print_function, absolute_import
from builtins import range
import os.path as op
import numpy as np
import nibabel as nib
import pandas as pd
from sklearn.externals import joblib
from scipy import stats
from itertools import combinations
from scipy.misc import comb
from copy import copy
from sklearn.metrics import confusion_matrix
[docs]class MvpResults(object):
"""
.. _ReadTheDocs: http://skbold.readthedocs.io
Class to keep track of model evaluation metrics and feature scores.
See the ReadTheDocs_ homepage for more information on its API and use.
Parameters
----------
mvp : mvp-object
Necessary to extract some metadata from.
n_iter : int
Number of folds that will be kept track of.
type_model : str
Either 'classification' or 'regression'
feature_scoring : str
Which method to use to calculate feature-scores with. Can be:
1) 'fwm': feature weight mapping [1]_ - keep track of
raw voxel-weights (coefficients)
2) 'forward': transform raw voxel-weights to corresponding forward-
model [2]_.
confmat : bool
Whether to keep track of the confusion-matrix across folds (only
relevant for `type_model='classification'`)
verbose : bool
Whether to print extra output.
**metrics : keyword-arguments
Keyword arguments of the form: `name_metric: metric_function`;
any metric from scikit-learn works (or other metrics, as long as
they have two input args, `y_true` and `y_pred`).
References
----------
.. [1] Stelzer, J., Buschmann, T., Lohmann, G., Margulies, D.S., \
Trampel, R., and Turner, R. (2014). Prioritizing spatial accuracy in \
high-resolution fMRI data using multivariate feature weight mapping. \
Front. Neurosci., http://dx.doi.org/10.3389/fnins.2014.00066.
.. [2] Haufe, S., Meineck, F., Gorger, K., Dahne, S., Haynes, J-D., \
Blankertz, B., and Biessmann, F. et al. (2014). On the interpretation \
of weight vectors of linear models in multivariate neuroimaging. \
Neuroimage, 87, 96-110.
"""
def __init__(self, mvp, n_iter, type_model='classification',
feature_scoring=None, confmat=False, verbose=False,
**metrics):
for name, metric in metrics.items():
setattr(self, name, np.zeros(n_iter))
if type_model != 'classification':
confmat = False
self.n_class = len(np.unique(mvp.y))
self.n_iter = n_iter
self.n_vox = np.zeros(self.n_iter)
self.mvp = mvp
self.fs = feature_scoring
self.verbose = verbose
self.X = mvp.X
self.y = mvp.y
self.data_shape = mvp.data_shape
self.data_name = mvp.data_name
self.affine = mvp.affine
self.voxel_idx = mvp.voxel_idx
self.featureset_id = mvp.featureset_id
self.iter = 0
self.voxel_values = None
self.df = None
self.metrics = metrics
if type_model == 'classification':
if self.n_class < 3 or self.fs == 'ufs':
self.voxel_values = np.zeros((self.n_iter, mvp.X.shape[1]))
else:
self.voxel_values = np.zeros((self.n_iter, mvp.X.shape[1],
self.n_class))
else:
self.voxel_values = np.zeros((self.n_iter, mvp.X.shape[1]))
if confmat:
self.metrics['confmat'] = confusion_matrix
self.confmat = np.zeros((self.n_iter, self.n_class, self.n_class))
[docs] def save_model(self, model, out_path):
""" Method to serialize model(s) to disk.
Parameters
----------
model : pipeline or scikit-learn object.
Model to be saved.
"""
# Can also be a pipeline!
if model.__class__.__name__ == 'Pipeline':
model = model.steps
for step in model:
fn = op.join(out_path, step[0] + '.jl')
joblib.dump(s)
[docs] def load_model(self, path, param=None):
""" Load model or pipeline from disk.
Parameters
----------
path : str
Absolute path to model.
param : str
Which, if any, specific param needs to be loaded.
"""
model = joblib.load(path)
if param is None:
return model
else:
if not isinstance(param, list):
param = [param]
return {p: getattr(model, p) for p in param}
[docs] def update(self, test_idx, y_pred, pipeline=None):
""" Updates with information from current fold.
Parameters
----------
test_idx : ndarray
Indices of current test-trials.
y_pred : ndarray
Predictions of current test-trials.
pipeline : scikit-learn Pipeline object
pipeline from which relevant scores/coefficients will be
extracted.
"""
i = self.iter
y_true = self.y[test_idx]
for name, metric in self.metrics.items():
tmp = getattr(self, name)
tmp[i] = metric(y_true, y_pred)
setattr(self, name, tmp)
if self.verbose:
print("%s: %.3f" % (name, tmp[i]))
if pipeline is not None and self.fs is not None:
self._update_voxel_values(pipeline)
self.iter += 1
[docs] def compute_scores(self, multiclass='ovr', maps_to_tstat=True):
""" Computes scores across folds. """
self._check_mvp_attributes()
df = {name: getattr(self, name, values)
for name, values in self.metrics.items() if name != 'confmat'}
if self.fs is not None:
df['n_voxels'] = self.n_vox
self.df = pd.DataFrame(df)
print(self.df.describe().loc[['mean', 'std']])
if self.fs is not None:
feature_scores = self._calculate_feature_scores(multiclass,
maps_to_tstat)
if len(feature_scores) == 1:
feature_scores = feature_scores[0]
self.feature_scores = feature_scores
return(self.df, feature_scores)
else:
return(self.df)
[docs] def write(self, out_path, confmat=True, to_tstat=True,
multiclass='ovr'):
""" Writes results to disk.
Parameters
----------
out_path : str
Where to save the results to
feature_viz : bool
Whether to write out (and optionally return) feature-visualization
information
confmat : bool
Whether to write out (and optionally return) the confusion-matrix
(across folds).
to_tstat : bool
Whether to convert averaged feature-scores to t-tstats (by dividing
them by sqrt(score.std(axis=0)).
"""
if self.df is None:
raise ValueError("Cannot write out results; "
"call compute_scores() first!")
self.df.loc[len(self.df)] = [np.nan] * self.df.shape[1]
self.df.loc[len(self.df)] = self.df.mean()
self.df.to_csv(op.join(out_path, 'results.tsv'),
sep='\t', index=False)
if hasattr(self, 'confmat'):
np.save(op.join(out_path, 'confmat'), self.confmat)
if self.fs is not None:
if not isinstance(self.feature_scores, list):
fscores = [self.feature_scores]
else:
fscores = self.feature_scores
for i, fscore in enumerate(fscores):
pos_idx = np.where(i == self.featureset_id)[0][0]
if len(fscore.shape) > 3:
for ii in range(fscore.ndim + 1):
fscore.to_filename(op.join(out_path,
self.data_name[pos_idx] +
'_%i.nii.gz' % ii))
else:
fscore.to_filename(op.join(out_path,
self.data_name[pos_idx] +
'.nii.gz'))
def _calculate_feature_scores(self, multiclass, to_tstat):
values = self.voxel_values
if multiclass == 'ovo':
# in scikit-learn 'ovo', Positive labels are reversed
values *= -1
n_class = len(np.unique(self.mvp.y))
n_models = comb(n_class, 2, exact=True)
cmb = list(combinations(range(n_models), 2))
scores = np.zeros((values.shape[0], values.shape[1], n_class))
for number in range(n_models):
for i, c in enumerate(cmb):
if number in c:
if c.index(number) == 1:
val = values[:, :, i] * -1
else:
val = values[:, :, i]
scores[:, :, number] += val
values = scores / n_class
nonzero_idx = values.sum(axis=0) != 0
if to_tstat:
n = values.shape[0]
m_values = values[:, nonzero_idx].mean(axis=0)
se_values = values[:, nonzero_idx].std(axis=0) / np.sqrt(n - 1)
values = values.sum(axis=0)
values[nonzero_idx] = m_values / se_values
else:
m_values = values[:, nonzero_idx].mean(axis=0)
values = values.sum(axis=0)
values[nonzero_idx] = m_values
fids = np.unique(self.featureset_id)
to_return = []
for i in fids:
pos_idx = np.where(i == fids)[0][0]
img = np.zeros(self.data_shape[pos_idx]).ravel()
subset = values[self.featureset_id == i]
if subset.ndim > 1:
for ii in range(subset.ndim + 1):
tmp_idx = self.voxel_idx[self.featureset_id == i]
img[tmp_idx] = subset[:, ii]
img = nib.Nifti1Image(img.reshape(
self.data_shape[pos_idx]), affine=self.affine[pos_idx])
to_return.append(img)
img = np.zeros(self.data_shape[pos_idx]).ravel()
else:
pos_idx = np.where(i == fids)[0][0]
img[self.voxel_idx[self.featureset_id == i]] = subset
img = nib.Nifti1Image(img.reshape(self.data_shape[pos_idx]),
affine=self.affine[pos_idx])
to_return.append(img)
return to_return
def _check_mvp_attributes(self):
if not isinstance(self.affine, list):
self.affine = [self.affine]
if not isinstance(self.data_shape, list):
self.data_shape = [self.data_shape]
if not isinstance(self.data_name, list):
self.data_name = [self.data_name]
def _extract_values_from_pipeline(self, pipe):
if pipe.__class__.__name__ == 'GridSearchCV':
pipe = pipe.best_estimator_
elif pipe.__class__.__name__ != 'Pipeline':
# hack to allow non-pipelines
pipe.idx_ = np.ones(pipe.coef_.size, dtype=bool)
pipe_steps = {pipe.__class__.__name__: pipe}
else:
pipe_steps = copy(pipe.named_steps)
for name, step in pipe_steps.items():
if hasattr(step, 'best_estimator_'):
pipe_steps[name] = step.best_estimator_
else:
pipe_steps[name] = step
match = 'coef_' if self.fs in ['fwm', 'forward'] else 'scores_'
val = [getattr(step, match) for step in pipe_steps.values()
if hasattr(step, match)]
ensemble = [step for step in pipe_steps.values()
if hasattr(step, 'estimators_')]
if len(val) == 1:
val = val[0]
elif len(val) == 0 and len(ensemble) == 1:
val = np.concatenate([ens.coef_ for ens in ensemble[0]]).mean(
axis=0)
elif len(val) == 0:
raise ValueError('Found no %s attribute anywhere in the '
'pipeline!' % match)
else:
raise ValueError('Found more than one %s attribute in the '
'pipeline!' % match)
idx = [step.get_support() for step in pipe_steps.values()
if callable(getattr(step, "get_support", None))]
if len(idx) == 0:
idx = [getattr(step, 'idx_') for step in pipe_steps.values()
if hasattr(step, 'idx_')]
if len(idx) == 1:
idx = idx[0]
elif len(idx) > 1:
msg = 'Found more than one index in pipeline!'
raise ValueError(msg)
else:
msg = 'Found no index in pipeline! Assuming no voxel selection.'
print(msg)
idx = np.ones(self.X.shape[1], dtype=bool)
val = np.squeeze(val)
if val.shape[0] != idx.sum():
val = val.T
return val, idx
def _update_voxel_values(self, pipe):
val, idx = self._extract_values_from_pipeline(pipe)
self.n_vox[self.iter] = val.shape[0]
if self.fs == 'fwm':
self.voxel_values[self.iter, idx] = val
elif self.fs == 'ufs':
self.voxel_values[self.iter, :] = val
elif self.fs == 'forward':
A = self._calculate_forward_mapping(val, idx)
self.voxel_values[self.iter, idx] = A
else:
msg = "Please specify either 'ufs', 'fwm', or 'forward'."
raise ValueError(msg)
def _calculate_forward_mapping(self, val, idx):
# Haufe et al. (2014). On the interpretation of weight vectors of
# linear models in multivariate neuroimaging. Neuroimage, 87, 96-110.
W = val
X = self.X[:, idx]
# Cov[x(n), y(n)]
A = np.cov(X.T).dot(W)
return A
[docs]class MvpAverageResults(object):
"""
Averages results from MVPA analyses on, for example, different subjects
or different ROIs.
Parameters
----------
mvp_results_list : list
List with MvpResults objects (after updating across folds)
identifiers : list of str
List of identifiers (e.g. subject-name) that correspond to the
different MvpResults objects
"""
def __init__(self, mvp_results_list, identifiers=None):
self.mvp_results_list = mvp_results_list
self.identifiers = identifiers
[docs] def compute_statistics(self, metric='accuracy', h0=0.5):
""" Computes statistics across MvpResults objects
Parameters
----------
metric : str
Which metric should be used in the MvpResults dataframes
h0 : float
The null-hypothesis in terms of model performance (e.g. accuracy
equals 1 / n_classes)
"""
if self.identifiers is None:
self.identifiers = np.arange(len(self.mvp_results_list))
scores = np.array([getattr(mvpr, metric)
for mvpr in self.mvp_results_list])
n = scores.shape[1]
df = dict(mean=scores.mean(axis=1),
std=scores.std(axis=1))
df['t'] = (df['mean'] - h0) / (df['std'] / np.sqrt(n - 1))
df['p'] = [stats.t.sf(abs(tt), n - 1) for tt in df['t']]
df['n_vox'] = np.mean([mvp.n_vox
for mvp in self.mvp_results_list])
df = pd.DataFrame(df, index=self.identifiers)
df = df.sort_values(by='t', ascending=False)
self.df = df
return df
[docs] def write(self, path, name='average_results'):
fn = op.join(path, name + '.tsv')
self.df.to_csv(fn, sep='\t')