Source code for skbold.core.mvp_between

from __future__ import division, print_function, absolute_import

import os
import re
import pickle
import warnings
import os.path as op
import pandas as pd
import numpy as np
import nibabel as nib
from io import open
from glob import glob
from fnmatch import fnmatch
from .mvp import Mvp
from sklearn.preprocessing import Imputer
from sklearn.preprocessing import StandardScaler, LabelEncoder
from ..preproc import MajorityUndersampler, LabelBinarizer


[docs]class MvpBetween(Mvp): """ Extracts and stores multivoxel pattern information across subjects. The MvpBetween class allows for the extraction and storage of multivoxel (MRI) pattern information across subjects. The MvpBetween class can handle various types of information, including functional contrasts, 3D (subject-specific) and 4D (subjects stacked) VBM and TBSS data, dual-regression data, and functional-connectivity data from resting-state scans (experimental). Parameters ---------- source : dict Dictionary with types of data as keys and data-specific dictionaries as values. Keys can be 'Contrast_*' (indicating a 3D functional contrast), '4D_anat' (for 4D anatomical - VBM/TBSS - files), 'VBM', 'TBSS', and 'dual_reg' (a subject-spedific 4D file with components as fourth dimension). The dictionary passed as values must include, for each data-type, a path with wildcards to the corresponding (subject-specific) data-file. Other, optional, key-value pairs per data-type can be assigned, including 'mask': 'path', to use data-type-specific masks. An example: >>> source = {} >>> path_emo = '~/data/sub0*/*.feat/stats/tstat1.nii.gz' >>> source['Contrast_emo'] = {'path': path_emo} >>> vbm_mask = '~/vbm_mask.nii.gz' >>> path_vbm = '~/data/sub0*/*vbm.nii.gz' >>> source['VBM'] = {'path': path_vbm, 'mask': vbm_mask} subject_idf : str Subject-identifier. This identifier is used to extract subject-names from the globbed directories in the 'path' keys in source, so that it is known which pattern belongs to which subject. This way, MvpBetween can check which subjects contain complete data! X : ndarray Not necessary to pass MvpWithin, but needs to be defined as it is needed in the super-constructor. y : ndarray or list Labels or targets corresponding to the samples in ``X``. mask : str Absolute path to nifti-file that will be used as a common mask. Note: this will only be applied if its shape corresponds to the to-be-indexed data. Otherwise, no mask is applied. Also, this mask is 'overridden' if source[data_type] contains a 'mask' key, which implies that this particular data-type has a custom mask. mask_threshold : int or float Minimum value to binarize the mask when it's probabilistic. Attributes ---------- mask_shape : tuple Shape of mask that patterns will be indexed with. nifti_header : list of Nifti1Header objects Nifti-headers from original data-types. affine : list of ndarray Affines corresponding to nifti-masks of each data-type. X : ndarray The actual patterns (2D: samples X features) y : list or ndarray Array/list with labels/targets corresponding to samples in X. common_subjects : list List of subject-names that have complete data specified in source. featureset_id : ndarray Array with integers of size X.shape[1] (i.e. the amount of features in X). Each unique integer, starting at 0, refers to a different feature-set. voxel_idx : ndarray Array with integers of size X.shape[1]. Per feature-set, these voxel- indices allow the features to be mapped back to whole-brain space. For example, to map back the features in X from feature set 1 to MNI152 (2mm) space, do: >>> mni_vol = np.zeros((91, 109, 91)) >>> tmp_idx = mvp.featureset_id == 0 >>> mni_vol[mvp.featureset_id[tmp_idx]] = mvp.X[0, tmp_idx] data_shape : list of tuples Original (whole-brain) shape of the loaded data, per data-type. data_name : list of str List of names of data-types. """ def __init__(self, source, subject_idf='sub0???', remove_zeros=True, X=None, y=None, mask=None, mask_thres=0, subject_list=None): super(MvpBetween, self).__init__(X=X, y=y, mask=mask, mask_thres=mask_thres) self.source = source self.remove_zeros = remove_zeros self.subject_idf = subject_idf self.subject_list = subject_list self.ref_space = 'mni' self.common_subjects = None self.y = y self.X = [] self.fs_masks = [] # featureset-specific masks self.featureset_id = [] self.affine = [] # This could be an array self.nifti_header = [] self.voxel_idx = [] self.data_shape = [] # This could be an array self.data_name = [] self.binarize_params = None if not isinstance(source, dict): msg = "Source must be a dictionary with type (e.g. 'VBM') " \ "path ('path/to/VBM_file.nii.gz) mappings!" raise TypeError(msg)
[docs] def create(self): """ Extracts and stores data as specified in source. Raises ------ ValueError If data-type is not one of ['VBM', 'TBSS', '4D_anat*', 'dual_reg', 'Contrast*'] """ # Globs all paths and checks for which subs there is complete data self._check_complete_data() # Loop over data-types as defined in source for data_type, args in self.source.items(): print('Processing: %s ...' % data_type) self.data_name.append(data_type) if 'mask' in args.keys(): maskl = nib.load(args['mask']) if 'mask_threshold' in args.keys(): th = args['mask_threshold'] else: th = 0 self.fs_masks.append({'path': args['mask'], 'threshold': th, 'affine': maskl.affine, 'shape': maskl.shape}) else: self.fs_masks.append(self.common_mask) TYPES_3D = ['VBM', 'TBSS', 'Contrast*'] if any(fnmatch(data_type, typ) for typ in TYPES_3D): self._load_3D(args) elif data_type == 'dual_reg': self._load_dual_reg(args) elif '4D_func' in data_type: self._load_4D_func(args) elif '4D_anat' in data_type: self._load_4D_anat(args) else: allowed = ['VBM', 'dual_reg', 'TBSS', 'Contrast', '4D_func', '4D_anat'] msg = "Data-type '%s' not recognized; please use one of the " \ "following: %r" % (data_type, allowed) raise KeyError(msg) if self.remove_zeros: self._remove_zeros() self.X = np.concatenate(self.X, axis=1) self.featureset_id = np.concatenate(self.featureset_id, axis=0) self.voxel_idx = np.concatenate(self.voxel_idx, axis=0) # 'Safety-check' to see whether everything corresponds properly assert(self.X.shape[1] == self.featureset_id.shape[0]) all_vox = np.sum([self.voxel_idx[i].size for i in range(len(self.voxel_idx))]) assert(self.X.shape[1] == all_vox) print("Final size of array: %r" % list(self.X.shape))
def _remove_zeros(self): # If remove_zeros, all columns with all zeros are removed to space if self.remove_zeros: indices = [np.invert(x == 0).all(axis=0) for x in self.X] for i, index in enumerate(indices): # Also other attributes are adapted to new shape self.X[i] = self.X[i][:, index] self.voxel_idx[i] = self.voxel_idx[i][index] self.featureset_id[i] = self.featureset_id[i][index] def _read_behav_file(self, file_path, sep, index_col, **kwargs): """ Reads in a tabular file using pandas read_csv """ df = pd.read_csv(file_path, sep=sep, index_col=index_col, **kwargs) df.index = [str(i) for i in df.index.tolist()] return df
[docs] def update_sample(self, idx): """ Updates the data matrix and associated attributes.""" self._update_common_subjects(idx) self.X = self.X[idx, :] if self.y is not None: self.y = self.y[idx]
def _undersample_majority(self): if len(np.unique(self.y)) > 5: msg = ("Found >5 classes and attempting to perform majority " "undersampling - are you sure the data is categorical?") warnings.warn(msg) self.y = LabelEncoder().fit(self.y).transform(self.y) mus = MajorityUndersampler(verbose=True) self.X, self.y = mus.fit().transform(self.X, self.y) self._update_common_subjects(mus.idx_) def _update_common_subjects(self, idx): """ Updates common_subjects after indexing. """ self.common_subjects = [sub for i, sub in enumerate(self.common_subjects) if idx[i]]
[docs] def add_y(self, file_path, col_name, sep='\t', index_col=0, normalize=False, remove=None, ensure_balanced=False, nan_strategy='remove', **kwargs): """ Sets ``y`` attribute to an outcome-variable (target). Parameters ---------- file_path : str Absolute path to spreadsheet-like file including the outcome var. col_name : str Column name in spreadsheet containing the outcome variable sep : str Separator to parse the spreadsheet-like file. index_col : int Which column to use as index (should correspond to subject-name). normalize : bool Whether to normalize (0 mean, unit std) the outcome variable. remove : int or float or str Removes instances in which y == remove from MvpBetween object. ensure_balanced : bool Whether to ensure balanced classes (if True, done by undersampling the majority class). nan_strategy : str Strategy on how to deal with NaNs. Default: 'remove'. Also, a specific string, int, or float can be specified when you want to impute a specific value. Other options, see: sklearn.preprocessing.Imputer. **kwargs Arbitrary keyword arguments passed to pandas read_csv. """ # Assumes index corresponds to self.common_subjects df = self._read_behav_file(file_path=file_path, sep=sep, index_col=index_col, **kwargs) df.index = check_zeropadding_and_sort(df.index.tolist()) common_idx = df.index.isin(self.common_subjects) behav = df.loc[common_idx, col_name] if behav.empty: msg = ("Couldnt find any data common to .common_subjects in the " "MvpBetween object!") raise ValueError(msg) self.y = np.array(behav) if remove is not None: idx = self.y != remove self.y = self.y[idx] self.X = self.X[idx, :] self._update_common_subjects(idx) self.y, idx = self._deal_with_missing_values(self.y, nan_strategy) if normalize: self.y = (self.y - self.y.mean()) / self.y.std() if ensure_balanced: self._undersample_majority()
def _deal_with_missing_values(self, arr, nan_strategy): """ Removes or imputes missing values. """ possibilities = ['remove', 'mean', 'median', 'most_frequent', 'depends'] if nan_strategy == 'depends': cont = len(np.unique(arr)) > 5 nan_strategy = 'mean' if cont else 'most_frequent' if arr.ndim < 2: arr = arr[:, np.newaxis] if nan_strategy == 'remove': idx = ~np.isnan(arr).any(axis=1) arr = arr[idx] elif nan_strategy not in possibilities: arr[np.isnan(arr)] = nan_strategy idx = None else: imp = Imputer(strategy=nan_strategy, axis=0) arr = imp.fit_transform(arr) idx = None arr = np.squeeze(arr) if idx is not None: self.y = self.y[idx] self.X = self.X[idx, :] self._update_common_subjects(idx) return arr, idx
[docs] def apply_binarization_params(self, param_file, ensure_balanced=False): """ Applies binarization-parameters to y. """ with open(param_file, 'rb') as fin: params = pickle.load(fin) if params['type'] == 'zscore': y_norm = (self.y - params['mean']) / params['std'] idx = np.abs(y_norm) > params['n_std'] y = (y_norm[idx] > 0).astype(int) else: msg = ("Apply binarization params other than 'zscore is " "not yet implemented.") raise ValueError(msg) self.y = y if idx is not None: self._update_common_subjects(idx) self.X = self.X[idx, :] if ensure_balanced: self._undersample_majority()
[docs] def binarize_y(self, params, save_path=None, ensure_balanced=False): """ Binarizes mvp's y-attribute using a specified method. Parameters ---------- params : dict The outcome variable (y) will be binarized along the key-value pairs in the params-argument. Options: >>> params = {'type': 'percentile', 'high': 75, 'low': 25} >>> params = {'type': 'zscore', 'std': 1} >>> params = {'type': 'constant', 'cutoff': 10} >>> params = {'type': 'median'} save_path : str If not None (default), this should be an absolute path referring to where the binarization-params should be saved. ensure_balanced : bool Whether to ensure balanced classes (if True, done by undersampling the majority class). """ labb = LabelBinarizer(params) self.X, self.y = labb.fit().transform(self.X, self.y) if labb.idx_ is not None: self._update_common_subjects(labb.idx_) if ensure_balanced: self._undersample_majority() if save_path is not None: with open(op.join(save_path, 'binarize_params.pkl'), 'wb') as w: pickle.dump(labb.binarize_params, w)
[docs] def split(self, file_path, col_name, target, sep='\t', index_col=0, nan_strategy='train', **kwargs): """ Splits an MvpBetween object based on some external index. Parameters ---------- file_path : str Absolute path to spreadsheet-like file including the outcome var. col_name : str Column name in spreadsheet containing the outcome variable target : str or int or float Target to which the data in col_name needs to be compared to, in order to create an index. sep : str Separator to parse the spreadsheet-like file. index_col : int Which column to use as index (should correspond to subject-name). nan_strategy : str Which value to impute if the labeling is absent. Default: 'train'. **kwargs Arbitrary keyword arguments passed to pandas read_csv. """ # Assumes index corresponds to self.common_subjects df = self._read_behav_file(file_path=file_path, sep=sep, index_col=index_col, **kwargs) common_idx = df.index.isin(self.common_subjects) behav = df.loc[common_idx, col_name] if behav.empty: print('Couldnt find any data common to .common_subjects in ' ' the MvpBetween object!') return 0 behav.index = check_zeropadding_and_sort(behav.index.tolist()) behav[behav.isnull()] = nan_strategy idx = np.array(behav) == target if idx.sum() == 0: raise ValueError('Found 0 subjects for split with target: %s' % str(target)) else: print("Splitting mvp with target '%s', found %i subjects." % (str(target), idx.sum())) self.X = self.X[idx, :] if self.y is not None: if len(self.y) > self.X.shape[0]: self.y = self.y[idx] self.common_subjects = [sub for i, sub in enumerate(self.common_subjects) if idx[i]]
[docs] def run_searchlight(self, out_dir, name='sl_results', n_folds=10, radius=5, mask=None, estimator=None, **kwargs): """ Runs a searchlight on the mvp object. Parameters ---------- out_dir : str Path to which to save the searchlight results name : str Name for the searchlight-results-file (nifti) n_folds : int The amount of folds in sklearn's StratifiedKFold. radius : int/list Radius for the searchlight. If list, it iterates over radii. mask : str Path to mask to apply to mvp. If nothing is listed, it will use the masks applied when the mvp was created. estimator : sklearn estimator or pipeline Estimator to use in the classification process. **kwargs Other keyword arguments for initializing nilearn's searchlight object (see nilearn.github.io/decoding/searchlight.html). """ # to do: implement import searchlight here (so skbold does not # necessarily depend on nilearn # NOT YET TESTED # Import OLD version from sklearn.model_selection import StratifiedKFold from sklearn.svm import SVC from sklearn.pipeline import Pipeline from nilearn.decoding import SearchLight nimgs = self.write_4D(return_nimg=True) cv = StratifiedKFold(self.y, n_folds=n_folds) if estimator is None: estimator = Pipeline([('scaler', StandardScaler()), ('svm', SVC(kernel='linear', C=1))]) if isinstance(radius, (int, float)): radius = [radius] for i, nimg in enumerate(nimgs): if mask is None: mask_tmp = self.fs_masks[i] if mask_tmp is not None: mask_bool = mask_tmp['idx'].astype(int) else: mask_bool = np.ones(nimg.shape, dtype=bool) for r in radius: sl = SearchLight(mask_img=mask_bool, radius=r, n_jobs=-1, cv=cv, estimator=estimator, scoring='accuracy', **kwargs) sl.fit(nimg, y=self.y) sl_nifti = nib.Nifti1Image(sl.scores_, nimg.affine) nib.save(sl_nifti, op.join(out_dir, name + '_%imm.nii.gz' % r))
[docs] def write_4D(self, path=None, return_nimg=False): """ Writes a 4D nifti (subs = 4th dimension) of X. Parameters ---------- path : str Absolute path to save nifti to. return_nimg : bool Whether to actually return the Nifti1-image object. """ if path is None: path = os.getcwd() fids = np.unique(self.featureset_id) nimgs = [] for i, fid in enumerate(fids): pos_idx = np.where(i == fids)[0][0] s = self.data_shape[pos_idx] to_write = np.zeros((np.prod(s), self.X.shape[0])) X_to_write = self.X[:, self.featureset_id == fid] to_write[self.voxel_idx[self.featureset_id == fid]] = X_to_write.T to_write = to_write.reshape((s[0], s[1], s[2], to_write.shape[-1])) img = nib.Nifti1Image(to_write, self.affine[pos_idx]) nimgs.append(img) if not return_nimg: img.to_filename(op.join(path, self.data_name[pos_idx]) + '.nii.gz') if return_nimg: if len(nimgs) == 1: return nimgs[0] else: return nimgs if self.y is not None: np.savetxt(op.join(path, 'y_4D_nifti.txt'), self.y, fmt='%1.4f', delimiter='\t')
def _load_4D_anat(self, args): # some checks tmp = nib.load(args['path']) if len(args['subjects']) != tmp.shape[-1]: msg = ("For 4D_anat, length of 'subjects' (%i) is different from " "amount of vols in nifti (%i)." % (len(args['subjects']), tmp.shape[-1])) raise ValueError(msg) args['subjects'] = check_zeropadding_and_sort(args['subjects']) idx = np.array([True if sub in self.common_subjects else False for sub in args['subjects']]) if tmp.shape[-1] > 500 and 'TBSS' in self.data_name[-1]: print('Loading TBSS data in two steps ...') # probably too large to load at once # cannot use idx, because nifti-slicing doesn't support that n_half = np.round(tmp.shape[-1] / 2.0).astype(int) data1 = tmp.dataobj[..., :n_half] data1 = data1[:, :, :, idx[:n_half]] data2 = tmp.dataobj[..., n_half:] data2 = data2[:, :, :, idx[n_half:]] data = np.concatenate((data1, data2), axis=-1) else: data = tmp.get_data() data = data[:, :, :, idx] voxel_idx = np.arange(np.prod(data.shape[:3])) tmp_mask = self.fs_masks[-1] if tmp_mask is None: tmp_mask = {'shape': data.shape[:3], 'idx': np.ones(np.prod(data.shape[:3]), dtype=bool)} if tmp_mask['shape'] == data.shape[:3]: data = data[tmp_mask['idx'].reshape(tmp.shape[:3])].T voxel_idx = voxel_idx[tmp_mask['idx']] else: data = data.reshape(-1, data.shape[-1]).T self.voxel_idx.append(voxel_idx) self.affine.append(tmp.affine) self.data_shape.append(tmp.shape[:3]) feature_ids = np.ones(data.shape[1], dtype=np.uint32) * len(self.X) self.featureset_id.append(feature_ids) self.X.append(data) def _load_dual_reg(self, args): data = [] # Kinda ugly, but switching loop over subjects and loop over comps # is not possible. for path in args['paths']: tmp = nib.load(path) tmp_shape = tmp.shape[:3] n_comps = tmp.shape[-1] if args['components'] is not None: final_comps = set.intersection(set(args['components']), set(range(1, n_comps + 1))) final_comps = [x - 1 for x in list(final_comps)] else: final_comps = range(n_comps) vols = [] for n_comp in final_comps: vol = tmp.dataobj[..., n_comp].ravel() tmp_mask = self.fs_masks[-1] if tmp_mask is None: tmp_mask = {'idx': np.ones(np.prod(vol.shape), dtype=bool), 'shape': tmp_shape} if tmp_mask['shape'] == tmp_shape: vol = vol[tmp_mask['idx']] vol = vol[np.newaxis, :] vols.append(vol) data.append(vols) # Hack to infer attributes by looking at last volume voxel_idx = np.arange(np.prod(tmp_shape)) tmp_mask = self.fs_masks[-1] if tmp_mask is None: tmp_mask = {'idx': np.ones(voxel_idx.size, dtype=bool), 'shape': tmp_shape} if tmp_mask['shape'] == tmp_shape: voxel_idx = voxel_idx[tmp_mask['idx']] _ = [self.voxel_idx.append(voxel_idx) for i in final_comps] _ = [self.data_shape.append(tmp_shape) for i in final_comps] _ = [self.affine.append(tmp.affine) for i in final_comps] _ = [self.nifti_header.append(tmp.header) for i in final_comps] name = self.data_name[-1] self.data_name.pop() _ = [self.data_name.append(name + '_comp%i' % i) for i in final_comps] for i in range(len(final_comps)): feature_ids = np.ones(vol.shape[1], dtype=np.uint32) * len(self.X) self.featureset_id.append(feature_ids) data_concat = np.concatenate([sub[i] for sub in data], axis=0) self.X.append(data_concat) def _load_3D(self, args): data = [] for path in args['paths']: tmp = nib.load(path) tmp_data = tmp.get_data().ravel() tmp_mask = self.fs_masks[-1] if tmp_mask is None: tmp_mask = {'idx': np.ones(tmp_data.size, dtype=bool), 'shape': tmp.shape} if tmp_mask['shape'] == tmp.shape: tmp_data = tmp_data[tmp_mask['idx']] data.append(tmp_data[np.newaxis, :]) voxel_idx = np.arange(np.prod(tmp.shape)) if tmp_mask['shape'] == tmp.shape: voxel_idx = voxel_idx[tmp_mask['idx']] self.voxel_idx.append(voxel_idx) self.affine.append(tmp.affine) self.data_shape.append(tmp.shape) data = np.concatenate(data, axis=0) feature_ids = np.ones(data.shape[1], dtype=np.uint32) * len(self.X) self.featureset_id.append(feature_ids) self.X.append(data) def _check_complete_data(self): for data_type, args in self.source.items(): if '4D_anat' in data_type: continue args['paths'] = check_zeropadding_and_sort(glob(args['path'])) ex_path = args['paths'][0].split(os.sep) idx = [True if fnmatch(p, self.subject_idf) else False for p in ex_path] position = list(np.arange(len(ex_path))[np.array(idx)]) if len(position) > 1: msg = ("Couldn't resolve to which subject the file '%s' " "belongs because subject-idf (%s) is ambiguous!" % (ex_path, self.subject_idf)) raise ValueError(msg) elif len(position) == 1: position = position[0] else: msg = ("Couldn't determine which subject belongs to which path" "for data-type = '%s'" % data_type) raise ValueError(msg) args['position'] = position args['subjects'] = [p.split(os.sep)[position] for p in args['paths']] all_subjects = [set(args['subjects']) for args in self.source.values()] if self.subject_list is not None: all_subjects.append(set(self.subject_list)) self.common_subjects = list(set.intersection(*all_subjects)) self.common_subjects = check_zeropadding_and_sort(self.common_subjects) print("Found a set of %i complete subjects for data-types: %r" % (len(self.common_subjects), [key for key in self.source])) for data_type, args in self.source.items(): if '4D_anat' in data_type: continue args['paths'] = [p for p in args['paths'] if p.split(os.sep)[args['position']] in self.common_subjects]
def _check_if_number(text): if text.isdigit(): return int(text) else: return text.lower() def _alphanum_key(key): return [_check_if_number(c) for c in re.split("([0-9]+)", key)]
[docs]def check_zeropadding_and_sort(lst): length = len(lst[0]) zero_pad = all(len(i) == length for i in lst) if zero_pad: return sorted(lst) else: return sorted(lst, key=_alphanum_key)