Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Registration tools #78

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dmriprep/config/emc_coarse_Affine.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"level_iters": [1000, 100],
"metric": "MI",
"sigmas": [8.0, 2.0],
"factors": [2, 1],
"sampling_prop": 0.15,
"nbins": 48
}
8 changes: 8 additions & 0 deletions dmriprep/config/emc_coarse_Rigid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"level_iters": [100, 100],
"metric": "MI",
"sigmas": [8.0, 2.0],
"factors": [2, 1],
"sampling_prop": 0.15,
"nbins": 48
}
8 changes: 8 additions & 0 deletions dmriprep/config/emc_precise_Affine.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"level_iters": [1000, 1000],
"metric": "MI",
"sigmas": [8.0, 2.0],
"factors": [2, 1],
"sampling_prop": 0.15,
"nbins": 48
}
8 changes: 8 additions & 0 deletions dmriprep/config/emc_precise_Rigid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"level_iters": [1000, 1000],
"metric": "MI",
"sigmas": [8.0, 2.0],
"factors": [2, 1],
"sampling_prop": 0.15,
"nbins": 48
}
147 changes: 147 additions & 0 deletions dmriprep/interfaces/registration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""Register tools interfaces."""
import numpy as np
import nibabel as nb
import dmriprep
from nipype import logging
from pathlib import Path
from nipype.utils.filemanip import fname_presuffix
from nipype.interfaces.base import (
traits,
TraitedSpec,
BaseInterfaceInputSpec,
InputMultiObject,
SimpleInterface,
File,
)


LOGGER = logging.getLogger("nipype.interface")

dPys marked this conversation as resolved.
Show resolved Hide resolved

class _ApplyAffineInputSpec(BaseInterfaceInputSpec):
moving_image = File(
exists=True, mandatory=True, desc="image to apply transformation from"
dPys marked this conversation as resolved.
Show resolved Hide resolved
)
fixed_image = File(
exists=True, mandatory=True, desc="image to apply transformation to"
dPys marked this conversation as resolved.
Show resolved Hide resolved
)
transform_affine = InputMultiObject(
File(exists=True), mandatory=True, desc="transformation affine"
)
invert_transform = traits.Bool(False, usedefault=True)


class _ApplyAffineOutputSpec(TraitedSpec):
warped_image = File(exists=True, desc="Outputs warped image")


class ApplyAffine(SimpleInterface):
"""
Interface to apply an affine transformation to an image.
"""

input_spec = _ApplyAffineInputSpec
output_spec = _ApplyAffineOutputSpec

def _run_interface(self, runtime):
from dmriprep.utils.registration import apply_affine

warped_image_nifti = apply_affine(
nb.load(self.inputs.moving_image),
nb.load(self.inputs.fixed_image),
np.load(self.inputs.transform_affine[0]),
self.inputs.invert_transform,
)
cwd = Path(runtime.cwd).absolute()
warped_file = fname_presuffix(
self.inputs.moving_image,
use_ext=False,
suffix="_warped.nii.gz",
newpath=str(cwd),
)

warped_image_nifti.to_filename(warped_file)

self._results["warped_image"] = warped_file
return runtime


class _RegisterInputSpec(BaseInterfaceInputSpec):
moving_image = File(
exists=True, mandatory=True, desc="image to apply transformation from"
)
fixed_image = File(
exists=True, mandatory=True, desc="image to apply transformation to"
)
nbins = traits.Int(default_value=32, usedefault=True)
sampling_prop = traits.Float(default_value=1, usedefault=True)
metric = traits.Str(default_value="MI", usedefault=True)
level_iters = traits.List(
trait=traits.Any(), value=[10000, 1000, 100], usedefault=True
)
sigmas = traits.List(trait=traits.Any(), value=[5.0, 2.5, 0.0], usedefault=True)
factors = traits.List(trait=traits.Any(), value=[4, 2, 1], usedefault=True)
params0 = traits.ArrayOrNone(value=None, usedefault=True)
pipeline = traits.List(
trait=traits.Any(),
dPys marked this conversation as resolved.
Show resolved Hide resolved
value=["c_of_mass", "translation", "rigid", "affine"],
dPys marked this conversation as resolved.
Show resolved Hide resolved
usedefault=True,
)


class _RegisterOutputSpec(TraitedSpec):
forward_transforms = traits.List(
File(exists=True), desc="List of output transforms for forward registration"
)
warped_image = File(exists=True, desc="Outputs warped image")


class Register(SimpleInterface):
"""
Interface to perform affine registration.
"""

input_spec = _RegisterInputSpec
output_spec = _RegisterOutputSpec

def _run_interface(self, runtime):
from dmriprep.utils.registration import affine_registration

reg_types = ["c_of_mass", "translation", "rigid", "affine"]
dPys marked this conversation as resolved.
Show resolved Hide resolved
pipeline = [
getattr(dmriprep.utils.register, i)
for i in self.inputs.pipeline
if i in reg_types
dPys marked this conversation as resolved.
Show resolved Hide resolved
]

warped_image_nifti, forward_transform_mat = affine_registration(
nb.load(self.inputs.moving_image),
nb.load(self.inputs.fixed_image),
self.inputs.nbins,
self.inputs.sampling_prop,
self.inputs.metric,
pipeline,
self.inputs.level_iters,
self.inputs.sigmas,
self.inputs.factors,
self.inputs.params0,
)
cwd = Path(runtime.cwd).absolute()
warped_file = fname_presuffix(
self.inputs.moving_image,
use_ext=False,
suffix="_warped.nii.gz",
newpath=str(cwd),
)
forward_transform_file = fname_presuffix(
self.inputs.moving_image,
use_ext=False,
suffix="_forward_transform.npy",
newpath=str(cwd),
)
warped_image_nifti.to_filename(warped_file)

np.save(forward_transform_file, forward_transform_mat)
self._results["warped_image"] = warped_file
self._results["forward_transforms"] = [forward_transform_file]
return runtime
170 changes: 170 additions & 0 deletions dmriprep/utils/registration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"""
Linear affine registration tools for motion correction.
"""
import numpy as np
import nibabel as nb
from dipy.align.metrics import CCMetric, EMMetric, SSDMetric
from dipy.align.imaffine import (
transform_centers_of_mass,
AffineMap,
MutualInformationMetric,
AffineRegistration,
)
from dipy.align.transforms import (
TranslationTransform3D,
RigidTransform3D,
AffineTransform3D,
)
from nipype.utils.filemanip import fname_presuffix

syn_metric_dict = {"CC": CCMetric, "EM": EMMetric, "SSD": SSDMetric}

__all__ = [
"c_of_mass",
"translation",
"rigid",
"affine",
"affine_registration",
]


def apply_affine(moving, static, transform_affine, invert=False):
"""Apply an affine to transform an image from one space to another.

Parameters
----------
moving : array
The image to be resampled

static : array

Returns
-------
warped_img : the moving array warped into the static array's space.

"""
affine_map = AffineMap(
transform_affine, static.shape, static.affine, moving.shape, moving.affine
)
if invert is True:
warped_arr = affine_map.transform_inverse(np.asarray(moving.dataobj))
else:
warped_arr = affine_map.transform(np.asarray(moving.dataobj))

return nb.Nifti1Image(warped_arr, static.affine)


def average_affines(transforms):
affine_list = [np.load(aff) for aff in transforms]
average_affine_file = fname_presuffix(
transforms[0], use_ext=False, suffix="_average.npy"
)
np.save(average_affine_file, np.mean(affine_list, axis=0))
return average_affine_file


# Affine registration pipeline:
affine_metric_dict = {"MI": MutualInformationMetric, "CC": CCMetric}


def c_of_mass(
moving, static, static_affine, moving_affine, reg, starting_affine, params0=None
):
transform = transform_centers_of_mass(static, static_affine, moving, moving_affine)
transformed = transform.transform(moving)
return transformed, transform.affine


def translation(
moving, static, static_affine, moving_affine, reg, starting_affine, params0=None
):
transform = TranslationTransform3D()
translation = reg.optimize(
static,
moving,
transform,
params0,
static_affine,
moving_affine,
starting_affine=starting_affine,
)

return translation.transform(moving), translation.affine


def rigid(
moving, static, static_affine, moving_affine, reg, starting_affine, params0=None
):
transform = RigidTransform3D()
rigid = reg.optimize(
static,
moving,
transform,
params0,
static_affine,
moving_affine,
starting_affine=starting_affine,
)
return rigid.transform(moving), rigid.affine


def affine(
moving, static, static_affine, moving_affine, reg, starting_affine, params0=None
):
transform = AffineTransform3D()
affine = reg.optimize(
static,
moving,
transform,
params0,
static_affine,
moving_affine,
starting_affine=starting_affine,
)

return affine.transform(moving), affine.affine


def affine_registration(
moving,
static,
nbins,
sampling_prop,
metric,
pipeline,
level_iters,
sigmas,
factors,
params0,
):

"""
Find the affine transformation between two 3D images.

Parameters
----------

"""
# Define the Affine registration object we'll use with the chosen metric:
use_metric = affine_metric_dict[metric](nbins, sampling_prop)
affreg = AffineRegistration(
metric=use_metric, level_iters=level_iters, sigmas=sigmas, factors=factors
)

if not params0:
starting_affine = np.eye(4)
else:
starting_affine = params0

# Go through the selected transformation:
for func in pipeline:
transformed, starting_affine = func(
np.asarray(moving.dataobj),
np.asarray(static.dataobj),
static.affine,
moving.affine,
affreg,
starting_affine,
params0,
)
return nb.Nifti1Image(np.array(transformed), static.affine), starting_affine
Loading