Source code for cortex.volume

"""Contains functions for working with volume data
"""
import os
import numpy as np

from . import dataset
from .database import db
from .xfm import Transform

[docs]def unmask(mask, data): """unmask(mask, data) Unmask the data, assuming it's been masked. Creates a volume the same size as `mask` containing `data` at the locations where `mask` is True. If `data` is RGB valued (dtype uint8 and last dim is 3 or 4), the area outside the mask will be filled with zeros. Otherwise, a numpy MaskedArray will be returned. Parameters ---------- mask : array_like The data mask data : array_like Actual MRI data to unmask Returns ------- unmasked : array_like Volume same size as `mask` but same dtype as `data`. """ nvox = mask.sum() if data.shape[0] == nvox: data = data[np.newaxis] elif nvox not in data.shape: raise ValueError('Invalid mask for the data') if data.shape[-1] in (3, 4): if data.dtype != np.uint8: raise TypeError('Invalid dtype for raw dataset') #raw dataset, unmask with an alpha channel output = np.zeros((len(data),)+mask.shape+(4,), dtype=np.uint8) output[:, mask > 0, :data.shape[-1]] = data if data.shape[-1] == 3: output[:, mask > 0, 3] = 255 else: #output = (np.nan*np.ones((len(data),)+mask.shape)).astype(data.dtype) outdata = np.zeros((len(data),)+mask.shape).astype(data.dtype) outdata[:, mask>0] = data outmask = np.tile(~mask[None,:,:,:], (len(data), 1, 1, 1)) output = np.ma.MaskedArray(outdata, mask=outmask) return output.squeeze()
def detrend_median(data, kernel=15): from scipy.signal import medfilt lowfreq = medfilt(data, [1, kernel, kernel]) return data - lowfreq def detrend_gradient(data, diff=3): return (np.array(np.gradient(data, 1, diff, diff))**2).sum(0) def detrend_poly(data, polyorder = 10, mask=None): from scipy.special import legendre polys = [legendre(i) for i in range(polyorder)] s = data.shape b = data.ravel()[:,np.newaxis] lins = np.mgrid[-1:1:s[0]*1j, -1:1:s[1]*1j, -1:1:s[2]*1j].reshape(3,-1) if mask is not None: lins = lins[:,mask.ravel() > 0] b = b[mask.ravel() > 0] A = np.vstack([[p(i) for i in lins] for p in polys]).T x, res, rank, sing = np.linalg.lstsq(A, b) detrended = b.ravel() - np.dot(A, x).ravel() if mask is not None: filled = np.zeros_like(mask) filled[mask > 0] = detrended return filled else: return detrended.reshape(*s)
[docs]def mosaic(data, dim=0, show=True, **kwargs): """ Turns volume data into a mosaic, useful for quickly viewing volumetric data with radiological convention (left side of figure is right side of subject). Parameters ---------- data : array_like 3D volumetric data to mosaic dim : int Dimension across which to mosaic. Default 0. show : bool Display mosaic with matplotlib? Default True. """ if data.ndim not in (3, 4): raise ValueError("Invalid data shape") plane = list(data.shape) slices = plane.pop(dim) height, width = plane[:2] aspect = width / float(height) square = np.sqrt(slices / aspect) nwide = int(np.ceil(square)) ntall = int(np.ceil(float(slices) / nwide)) shape = (ntall * (height+1) + 1, nwide * (width+1) + 1) if data.dtype == np.uint8: output = np.zeros(shape+(4,), dtype=np.uint8) else: output = (np.nan*np.ones(shape)).astype(data.dtype) sl = [slice(None), slice(None), slice(None)] for h in range(ntall): for w in range(nwide): sl[dim] = h*nwide+w if sl[dim] < slices: hsl = slice(h*(height+1)+1, (h+1)*(height+1)) wsl = slice(w*(width+1)+1, (w+1)*(width+1)) if data.dtype == np.uint8: output[hsl, wsl, :data.shape[3]] = data[tuple(sl)] if data.shape[3] == 3: output[hsl, wsl, 3] = 255 else: output[hsl, wsl] = data[tuple(sl)] if show: from matplotlib import pyplot as plt plt.imshow(output, **kwargs) plt.axis('off') return output, (nwide, ntall)
[docs]def show_slice(dataview, **kwargs): import nibabel from matplotlib import cm import matplotlib.pyplot as plt dataview = dataset.normalize(dataview) if not isinstance(dataview, dataset.Volume): raise TypeError('Only volumetric data may be visualized in show_slice') subject = dataview.subject xfmname = dataview.xfmname imshow_kw = dict(vmin=dataview.vmin, vmax=dataview.vmax, cmap=dataview.cmap) imshow_kw.update(kwargs) anat = db.get_anat(subject, 'raw').get_data().T data = epi2anatspace(dataview) data[data < dataview.vmin] = np.nan state = dict(slice=data.shape[0]*.66, dim=0, pad=()) fig = plt.figure() ax = fig.add_subplot(111) anatomical = ax.imshow(anat[state['pad'] + (state['slice'],)], cmap=cm.gray, aspect='equal') functional = ax.imshow(data[state['pad'] + (state['slice'],)], aspect='equal', **imshow_kw) def update(): print("showing dim %d, slice %d"%(state['dim'] % 3, state['slice'])) functional.set_data(data[state['pad'] + (state['slice'],)]) anatomical.set_data(anat[state['pad'] + (state['slice'],)]) fig.canvas.draw() def switchslice(event): state['dim'] += 1 state['pad'] = (slice(None),)*(state['dim'] % 3) update() def scrollslice(event): if event.button == 'up': state['slice'] += 1 elif event.button == 'down': state['slice'] -= 1 update() fig.canvas.mpl_connect('scroll_event', scrollslice) fig.canvas.mpl_connect('button_press_event', switchslice) return fig
[docs]def show_mip(data, **kwargs): '''Display a maximum intensity projection for the data, using three subplots''' import matplotlib.pyplot as plt fig = plt.figure() fig.add_subplot(221).imshow(data.max(0), **kwargs) fig.add_subplot(222).imshow(data.max(1), **kwargs) fig.add_subplot(223).imshow(data.max(2), **kwargs) return fig
[docs]def show_glass(dataview, pad=10): '''Create a classic "glass brain" view of the data, with the outline''' import nibabel nib = db.get_anat(subject, 'fiducial') mask = nib.get_data() left, right = np.nonzero(np.diff(mask.max(0).max(0)))[0][[0,-1]] front, back = np.nonzero(np.diff(mask.max(0).max(1)))[0][[0,-1]] top, bottom = np.nonzero(np.diff(mask.max(1).max(1)))[0][[0,-1]] glass = np.zeros((mask.shape[1], mask.shape[2]*2), dtype=bool) glass[:, :mask.shape[2]] = mask.max(0) #this requires a lot of logic to put the image into a canonical orientation #too much work for something we'll never use raise NotImplementedError
[docs]def epi2anatspace(volumedata, order=1): """Resample an epi volume data into anatomical space using scipy. Parameters ---------- volumedata : VolumeData The input epi volumedata object. order : int The order of the resampler, in terms of splines. 0 is nearest, 1 is linear. Returns ------- anatspace : ndarray The ND array of the anatomy space data """ from scipy.ndimage.interpolation import affine_transform ds = dataset.normalize(volumedata) volumedata = ds#.data anat = db.get_anat(volumedata.subject, "raw") xfm = db.get_xfm(volumedata.subject, volumedata.xfmname, "coord") #allxfm = Transform(anat.get_affine(), anat.shape).inv * xfm.inv allxfm = xfm * Transform(anat.get_affine(), anat.shape) rotpart = allxfm.xfm[:3, :3] transpart = allxfm.xfm[:3,-1] return affine_transform(volumedata.volume.T.squeeze(), rotpart, offset=transpart, output_shape=anat.shape[::-1], cval=np.nan, order=order).T
[docs]def anat2epispace(anatdata, subject, xfmname, order=1): """Resamples data from anatomical space into epi space Parameters ---------- anatdata : ndarray data in anatomical space subject : str Name of subject xfmname : str Name of transform order : int Order of spline interpolation Returns ------- epidata : ndarray data in EPI space """ from scipy.ndimage.interpolation import affine_transform anatref = db.get_anat(subject) target = db.get_xfm(subject, xfmname, "coord") allxfm = Transform(anatref.get_affine(), anatref.shape).inv * target.inv #allxfm = xfm * Transform(anat.get_affine(), anat.shape) rotpart = allxfm.xfm[:3, :3] transpart = allxfm.xfm[:3,-1] return affine_transform(anatdata.T, rotpart, offset=transpart, output_shape=target.shape[::-1], cval=np.nan, order=order).T
[docs]def epi2anatspace_fsl(volumedata): """Resamples epi-space [data] into the anatomical space for the given [subject] using the given transformation [xfm]. """ #This function is currently broken! do not use it! raise NotImplementedError import tempfile import subprocess import nibabel volumedata = dataset.normalize(volumedata).data subject = volumedata.subject xfmname = volumedata.xfmname data = volumedata.volume ## Get transform (pycortex estimates anat-to-epi) xfm = db.get_xfm(subject, xfmname) fslxfm = xfm.to_fsl(db.get_anat(subject, 'raw').get_filename()) ## Invert transform to epi-to-anat fslxfm = np.linalg.inv(fslxfm) ## Save out into ascii file xfmfilename = tempfile.mktemp(".mat") with open(xfmfilename, "w") as xfmh: for ll in fslxfm.tolist(): xfmh.write(" ".join(["%0.5f"%f for f in ll])+"\n") ## Save out data into nifti file datafile = nibabel.Nifti1Image(data.T, xfm.reference.get_affine(), xfm.reference.get_header()) datafilename = tempfile.mktemp(".nii") nibabel.save(datafile, datafilename) ## Reslice epi-space image raw = db.get_anat(subject, type='raw').get_filename() outfilename = tempfile.mktemp(".nii") subprocess.call(["fsl5.0-flirt", "-ref", raw, "-in", datafilename, "-applyxfm", "-init", xfmfilename, #"-interp", "sinc", "-out", outfilename]) ## Load resliced image outdata = nibabel.load(outfilename+".gz").get_data().T ## Clean up os.remove(outfilename+".gz") os.remove(datafilename) ## Done! return outdata
[docs]def anat2epispace_fsl(data,subject,xfmname): """Resamples anat-space data into the epi space for the given [subject] and transformation [xfm] """ import tempfile import subprocess import nibabel ## Get transform (pycortex estimates anat-to-epi) xfm = db.get_xfm(subject, xfmname) anatNII = db.get_anat(subject, type='raw') fslxfm = xfm.to_fsl(anatNII.get_filename()) ## Save out into ascii file xfmfilename = tempfile.mktemp(".mat") print('xfm file: %s'%xfmfilename) with open(xfmfilename, "w") as xfmh: for ll in fslxfm.tolist(): xfmh.write(" ".join(["%0.5f"%f for f in ll])+"\n") ## Save out data into nifti file datafile = nibabel.Nifti1Image(data.T, anatNII.get_affine(), anatNII.get_header()) datafilename = tempfile.mktemp(".nii") nibabel.save(datafile, datafilename) ## Reslice epi-space image epiNIIf = xfm.reference.get_filename() outfilename = tempfile.mktemp(".nii") subprocess.call(["fsl5.0-flirt", "-in", datafilename, "-ref", epiNIIf, "-out", outfilename, "-init", xfmfilename, #"-interp", "sinc", "-applyxfm"]) ## Load resliced image outdata = nibabel.load(outfilename+".gz").get_data().T ## Clean up os.remove(outfilename+".gz") os.remove(datafilename) ## Done! return outdata
def fslview(*ims): import tempfile import subprocess fnames = [] tempfiles = [] for im in ims: if not (isinstance(im, str) and os.path.exists(im)): import nibabel tf = tempfile.NamedTemporaryFile(suffix=".nii.gz") tempfiles.append(tf) nib = nibabel.Nifti1Image(im.T, np.eye(4)) nib.to_filename(tf.name) fnames.append(tf.name) else: fnames.append(im) subprocess.call(["fslview"] + fnames)