Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul of the Reconstruction Pipeline to use a scanobj #164

Merged
merged 9 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 42 additions & 268 deletions brkraw/lib/recoFunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,302 +3,76 @@
DEVELOPED FOR BRUKER PARAVISION 360 datasets
Functions below are made to support functions
in recon.py

@author: Tim Ho (UVA)
"""

from .utils import get_value
import numpy as np

def reco_qopts(frame, Reco, actual_framenumber):

# import variables
RECO_qopts = get_value(Reco, 'RECO_qopts')

# claculate additional parameters
dims = [frame.shape[0], frame.shape[1], frame.shape[2], frame.shape[3]]

# check if the qneg-Matrix is necessary:
use_qneg = False
if (RECO_qopts.count('QUAD_NEGATION') + RECO_qopts.count('CONJ_AND_QNEG')) >= 1:
use_qneg = True
qneg = np.ones(frame.shape) # Matrix containing QUAD_NEGATION multiplication matrix

# start process
for i in range(len(RECO_qopts)):
if RECO_qopts[i] == 'COMPLEX_CONJUGATE':
frame = np.conj(frame)
elif RECO_qopts[i] == 'QUAD_NEGATION':
if i == 0:
qneg = qneg * np.tile([[1, -1]], [np.ceil(dims[0]/2), dims[1], dims[2], dims[3]])
elif i == 1:
qneg = qneg * np.tile([[1], [-1]], [dims[0], np.ceil(dims[1]/2), dims[2], dims[3]])
elif i == 2:
tmp = np.zeros([1, 1, dims[2], 2])
tmp[0, 0, :, :] = [[1, -1]]
qneg = qneg * np.tile(tmp, [dims[0], dims[1], np.ceil(dims[2]/2), dims[3]])
elif i == 3:
tmp = np.zeros([1, 1, 1, dims[3], 2])
tmp[0, 0, 0, :, :] = [[1, -1]]
qneg = qneg * np.tile(tmp, [dims[0], dims[1], dims[2], np.ceil(dims[3]/2)])
elif RECO_qopts[i] == 'CONJ_AND_QNEG':
frame = np.conj(frame)
if i == 0:
qneg = qneg * np.tile([[1, -1]], [np.ceil(dims[0]/2), dims[1], dims[2], dims[3]])
elif i == 1:
qneg = qneg * np.tile([[1], [-1]], [dims[0], np.ceil(dims[1]/2), dims[2], dims[3]])
elif i == 2:
tmp = np.zeros([1, 1, dims[2], 2])
tmp[0, 0, :, :] = [[1, -1]]
qneg = qneg * np.tile(tmp, [dims[0], dims[1], np.ceil(dims[2]/2), dims[3]])
elif i == 3:
tmp = np.zeros([1, 1, 1, dims[3], 2])
tmp[0, 0, 0, :, :] = [[1, -1]]
qneg = qneg * np.tile(tmp, [dims[0], dims[1], dims[2], np.ceil(dims[3]/2)])

if use_qneg:
if qneg.shape != frame.shape:
qneg = qneg[0:dims[0], 0:dims[1], 0:dims[2], 0:dims[3]]
frame = frame * qneg

return frame


def reco_phase_rotate(frame, Reco, actual_framenumber):
import warnings

# import variables
RECO_rotate_all = get_value(Reco,'RECO_rotate')
def phase_rotate(frame, RECO_rotate, framenumber):

if RECO_rotate_all.shape[1] > actual_framenumber:
RECO_rotate = get_value(Reco,'RECO_rotate')[:, actual_framenumber]
if RECO_rotate.shape[1] > framenumber:
RECO_rotate = RECO_rotate[:, framenumber]
RECO_rotate -= 0.5
else:
RECO_rotate = get_value(Reco,'RECO_rotate')[:,0]
RECO_rotate = RECO_rotate[:,0]

if isinstance( get_value(Reco,'RECO_ft_mode'), list):
if any(x != get_value(Reco,'RECO_ft_mode')[0] for x in get_value(Reco,'RECO_ft_mode')):
raise ValueError('It''s not allowed to use different transfomations on different Dimensions: ' + Reco['RECO_ft_mode'])
RECO_ft_mode = get_value(Reco,'RECO_ft_mode')[0]
else:
RECO_ft_mode = get_value(Reco,'RECO_ft_mode')

# calculate additional variables
dims = [frame.shape[0], frame.shape[1], frame.shape[2], frame.shape[3]]
dims = [frame.shape[0], frame.shape[1], frame.shape[2]]

# start process
phase_matrix = np.ones_like(frame)
# Create Shift matrix in KSPACE
phase_matrix = np.ones(shape=dims,dtype=complex)
for index in range(len(RECO_rotate)):
f = np.arange(dims[index])

if RECO_ft_mode in ['COMPLEX_FT', 'COMPLEX_FFT']:
phase_vector = np.exp(1j*2*np.pi*RECO_rotate[index]*f)
elif RECO_ft_mode in ['NO_FT', 'NO_FFT']:
phase_vector = np.ones_like(f)
elif RECO_ft_mode in ['COMPLEX_IFT', 'COMPLEX_IFFT']:
phase_vector = np.exp(1j*2*np.pi*(1-RECO_rotate[index])*f)
else:
raise ValueError('Your RECO_ft_mode is not supported')

phase_vector = np.exp(1j*2*np.pi*(1-RECO_rotate[index])*f)
if index == 0:
phase_matrix *= np.tile(phase_vector[:,np.newaxis,np.newaxis,np.newaxis], [1, dims[1], dims[2], dims[3]])
phase_matrix *= np.tile(phase_vector[:,np.newaxis,np.newaxis], [1, dims[1], dims[2]])
elif index == 1:
phase_matrix *= np.tile(phase_vector[np.newaxis,:,np.newaxis,np.newaxis], [dims[0], 1, dims[2], dims[3]])
phase_matrix *= np.tile(phase_vector[np.newaxis,:,np.newaxis], [dims[0], 1, dims[2]])
elif index == 2:
tmp = np.zeros((1,1,dims[2],1), dtype=complex)
tmp[0,0,:,0] = phase_vector
phase_matrix *= np.tile(tmp, [dims[0], dims[1], 1, dims[3]])
elif index == 3:
tmp = np.zeros((1,1,1,dims[3]), dtype=complex)
tmp[0,0,0,:] = phase_vector
phase_matrix *= np.tile(tmp, [dims[0], dims[1], dims[2], 1])

frame *= phase_matrix
return frame


def reco_zero_filling(frame, Reco, actual_framenumber, signal_position):
# check input
RECO_ft_mode = get_value(Reco,'RECO_ft_mode')
tmp = np.zeros((1,1,dims[2]), dtype=complex)
tmp[0,0,:] = phase_vector
phase_matrix *= np.tile(tmp, [dims[0], dims[1], 1])

return phase_matrix

# Replace with zero padding
def zero_filling(frame, RECO_ft_size, signal_position=np.array([0.5,0.5,0.5])):
# Check if Reco.RECO_ft_size is not equal to size(frame)
not_Equal = any([(i != j) for i,j in zip(frame.shape,get_value(Reco, 'RECO_ft_size'))])

not_Equal = any([(i != j) for i,j in zip(frame.shape,RECO_ft_size)])
if not_Equal:
if any(signal_position > 1) or any(signal_position < 0):
raise ValueError('signal_position has to be a vector between 0 and 1')

RECO_ft_size = get_value(Reco,'RECO_ft_size')

# check if ft_size is correct:
for i in range(len(RECO_ft_size)):
if RECO_ft_size[i] < frame.shape[i]:
raise ValueError('RECO_ft_size has to be bigger than the size of your data-matrix')
warnings.warn('Signal needs to be between 0 and 1\nDefaulting to 0.5')
signal_position=np.array([0.5,0.5,0.5])

# calculate additional variables
dims = (frame.shape[0], frame.shape[1], frame.shape[2], frame.shape[3])
dims = (frame.shape[0], frame.shape[1], frame.shape[2])

# start process
newframe = np.zeros(RECO_ft_size, dtype=complex)
startpos = np.zeros(len(RECO_ft_size), dtype=int)
pos_ges = [None] * 3

# Dimensions of frame and RECO_ft_size doesn't match? -> zero filling
if not_Equal:
newframe = np.zeros(RECO_ft_size, dtype=complex)
startpos = np.zeros(len(RECO_ft_size), dtype=int)
pos_ges = [None] * 4

for i in range(len(RECO_ft_size)):
diff = RECO_ft_size[i] - frame.shape[i] + 1
startpos[i] = int(np.floor(diff * signal_position[i] + 1))
if startpos[i] > RECO_ft_size[i]:
startpos[i] = RECO_ft_size[i]
pos_ges[i] = slice(startpos[i] - 1, startpos[i] - 1 + dims[i])

newframe[pos_ges[0], pos_ges[1], pos_ges[2], pos_ges[3]] = frame
else:
newframe = frame

for i in range(len(RECO_ft_size)):
diff = RECO_ft_size[i] - frame.shape[i] + 1
startpos[i] = int(np.floor(diff * signal_position[i] + 1))
if startpos[i] > RECO_ft_size[i]:
startpos[i] = RECO_ft_size[i]
pos_ges[i] = slice(startpos[i] - 1, startpos[i] - 1 + dims[i])

newframe[pos_ges[0], pos_ges[1], pos_ges[2]] = frame
del startpos, pos_ges

else:
newframe = frame

return newframe


def reco_FT(frame, Reco, actual_framenumber):
"""
Perform Fourier Transform on the input frame according to the specified RECO_ft_mode in the Reco dictionary.

Args:
frame: ndarray
Input frame to perform Fourier Transform on
Reco: dict
Dictionary containing the specified Fourier Transform mode (RECO_ft_mode)
actual_framenumber: int
Index of the current frame

Returns:
frame: ndarray
Output frame after Fourier Transform has been applied
"""

# Import variables
RECO_ft_mode = get_value(Reco,'RECO_ft_mode')[0]

# Start process
if RECO_ft_mode in ['COMPLEX_FT', 'COMPLEX_FFT']:
frame = np.fft.fftn(frame)
#frame = sp.fft(frame, axes=[0,1,2], center=False)
elif RECO_ft_mode in ['NO_FT', 'NO_FFT']:
pass
elif RECO_ft_mode in ['COMPLEX_IFT', 'COMPLEX_IFFT']:
frame = np.fft.ifftn(frame)
#frame = sp.ifft(frame, axes=[0,1,2], center=False)
else:
raise ValueError('Your RECO_ft_mode is not supported')

return frame


def reco_phase_corr_pi(frame, Reco, actual_framenumber):
# start process
checkerboard = np.ones(shape=frame.shape)

def phase_corr(frame):
checkerboard = np.ones(shape=frame.shape[:3])
# Use NumPy broadcasting to alternate the signs
checkerboard[::2,::2,::2,0] = -1
checkerboard[1::2,1::2,::2,0] = -1

checkerboard[::2,1::2,1::2,0] = -1
checkerboard[1::2,::2,1::2,0] = -1

frame = frame * checkerboard * -1

return frame


def reco_cutoff(frame, Reco, actual_framenumber):
"""
Crops the input frame according to the specified RECO_size in the Reco dictionary.

Args:
frame: ndarray
Input frame to crop
Reco: dict
Dictionary containing the specified crop size (RECO_size) and offset (RECO_offset)
actual_framenumber: int
Index of the current frame

Returns:
newframe: ndarray
Cropped output frame
"""

# Use function only if Reco.RECO_size is not equal to size(frame)
dim_equal = True
for i,j in zip(get_value(Reco,'RECO_size'), frame.shape):
if i!=j:
dim_equal = False

if not dim_equal:
# Import variables
RECO_offset = get_value(Reco,'RECO_offset')[:, actual_framenumber]
RECO_size = get_value(Reco, 'RECO_size')

# Cut the new part with RECO_size and RECO_offset
pos_ges = []
for i in range(len(RECO_size)):
pos_ges.append(slice(RECO_offset[i], RECO_offset[i] + RECO_size[i]))
newframe = frame[tuple(pos_ges)]

else:
newframe = frame

return newframe


def reco_scale_phase_channels(frame, Reco, channel):
# check input
reco_scale = get_value(Reco,'RecoScaleChan')
if not isinstance(reco_scale, list):
reco_scale = [reco_scale]
if channel <= len(reco_scale) and reco_scale != None:
scale = reco_scale[int(channel)]
else:
scale = 1.0

reco_phase = get_value(Reco,'RecoPhaseChan')

if not isinstance(reco_phase, list):
reco_phase = [reco_phase]
if channel <= len(reco_phase) and reco_phase != None:
phase = reco_phase[int(channel)]
else:
phase = 0.0

spFactor = scale * np.exp(1j * phase * np.pi / 180.0)
# multiply each pixel by common scale and phase factor
frame = spFactor * frame
return frame


def reco_sumofsquares(frame, Reco):
out = np.sqrt( np.sum(np.square(np.abs(frame)), axis=4, keepdims=True) )
return out


def reco_transposition(frame, Reco, actual_framenumber):
# Import variables
RECO_transposition = get_value(Reco,'RECO_transposition')[actual_framenumber - 1]

# Calculate additional variables
dims = [frame.shape[i] for i in range(4)]

# Start process
if RECO_transposition > 0:
ch_dim1 = (RECO_transposition % 4)
ch_dim2 = RECO_transposition - 1
new_order = list(range(4))
new_order[int(ch_dim1)] = ch_dim2
new_order[int(ch_dim2)] = ch_dim1
frame = np.transpose(frame, new_order)
frame = np.reshape(frame, dims)

return frame
checkerboard[::2,::2,::2] = -1
checkerboard[1::2,1::2,::2] = -1
checkerboard[::2,1::2,1::2] = -1
checkerboard[1::2,::2,1::2] = -1
checkerboard *= -1
return checkerboard
Loading