Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import unittest
import numpy as np
from sigpy import interp, config
if config.cupy_enabled:
import cupy as cp
if __name__ == '__main__':
unittest.main()
class TestInterp(unittest.TestCase):
def test_interpolate(self):
xps = [np]
if config.cupy_enabled:
xps.append(cp)
batch = 2
for xp in xps:
for ndim in [1, 2, 3]:
y = np.ones(5)
npt.assert_allclose(thresh.l2_proj(10, x), y)
def test_soft_thresh(self):
x = np.array([-2, -1.5, -1, 0.5, 0, 0.5, 1, 1.5, 2])
y = np.array([-1, -0.5, 0, 0, 0, 0, 0, 0.5, 1])
npt.assert_allclose(thresh.soft_thresh(1, x), y)
def test_hard_thresh(self):
x = np.array([-2, -1.5, -1, 0.5, 0, 0.5, 1, 1.5, 2])
y = np.array([-2, -1.5, 0, 0, 0, 0, 0, 1.5, 2])
npt.assert_allclose(thresh.hard_thresh(1, x), y)
if config.cupy_enabled:
def test_soft_thresh_cuda(self):
x = cp.array([-2, -1.5, -1, 0.5, 0, 0.5, 1, 1.5, 2])
y = cp.array([-1, -0.5, 0, 0, 0, 0, 0, 0.5, 1])
lamda = cp.array([1.0])
cp.testing.assert_allclose(thresh.soft_thresh(lamda, x), y)
def test_hard_thresh_cuda(self):
x = cp.array([-2, -1.5, -1, 0.5, 0, 0.5, 1, 1.5, 2])
y = cp.array([-2, -1.5, 0, 0, 0, 0, 0, 1.5, 2])
lamda = cp.array([1.0])
cp.testing.assert_allclose(thresh.hard_thresh(lamda, x), y)
def test_shepp_logan_SenseRecon(self):
img, mps, ksp = self.shepp_logan_setup()
lamda = 0
for solver in ['ConjugateGradient',
'GradientMethod',
'PrimalDualHybridGradient',
'ADMM']:
with self.subTest(solver=solver):
img_rec = app.SenseRecon(
ksp, mps, lamda, solver=solver,
show_pbar=False).run()
npt.assert_allclose(img, img_rec, atol=1e-2, rtol=1e-2)
if sp.config.mpi4py_enabled:
def test_shepp_logan_SenseRecon_with_comm(self):
img, mps, ksp = self.shepp_logan_setup()
lamda = 0
comm = sp.Communicator()
ksp = ksp[comm.rank::comm.size]
mps = mps[comm.rank::comm.size]
for solver in ['ConjugateGradient',
'GradientMethod',
'PrimalDualHybridGradient',
'ADMM']:
with self.subTest(solver=solver):
img_rec = app.SenseRecon(
ksp, mps, lamda, comm=comm, solver=solver,
show_pbar=False).run()
npt.assert_allclose(img, img_rec, atol=1e-2, rtol=1e-2)
import unittest
import pickle
import numpy as np
import numpy.testing as npt
from sigpy import backend, linop, util, config
if __name__ == '__main__':
unittest.main()
dtypes = [np.float32, np.float64, np.complex64, np.complex128]
devices = [backend.cpu_device]
if config.cupy_enabled:
devices.append(backend.Device(0))
class TestLinop(unittest.TestCase):
def check_linop_unitary(self, A,
device=backend.cpu_device, dtype=np.float):
device = backend.Device(device)
x = util.randn(A.ishape, dtype=dtype, device=device)
xp = device.xp
with device:
xp.testing.assert_allclose(A.H * A * x, x,
atol=1e-5, rtol=1e-5)
def check_linop_linear(self, A,
device=backend.cpu_device, dtype=np.float):
img_shape = [16, 16]
mps_shape = [8, 16, 16]
img = sp.randn(img_shape, dtype=np.complex)
mps = sp.randn(mps_shape, dtype=np.complex)
y, x = np.mgrid[:16, :16]
coord = np.stack([np.ravel(y - 8), np.ravel(x - 8)], axis=1)
coord = coord.astype(np.float)
A = linop.Sense(mps, coord=coord, coil_batch_size=1)
check_linop_adjoint(A, dtype=np.complex)
npt.assert_allclose(sp.fft(img * mps, axes=[-1, -2]).ravel(),
(A * img).ravel(), atol=0.1, rtol=0.1)
if sp.config.mpi4py_enabled:
def test_sense_model_with_comm(self):
img_shape = [16, 16]
mps_shape = [8, 16, 16]
comm = sp.Communicator()
img = sp.randn(img_shape, dtype=np.complex)
mps = sp.randn(mps_shape, dtype=np.complex)
comm.allreduce(img)
comm.allreduce(mps)
ksp = sp.fft(img * mps, axes=[-1, -2])
A = linop.Sense(mps[comm.rank::comm.size], comm=comm)
npt.assert_allclose(A.H(ksp[comm.rank::comm.size]), np.sum(
sp.ifft(ksp, axes=[-1, -2]) * mps.conjugate(), 0))
def test_convolve_valid(self):
mode = 'valid'
devices = [backend.cpu_device]
if config.cupy_enabled:
devices.append(backend.Device(0))
for device in devices:
xp = device.xp
with device:
for dtype in [np.float32, np.float64,
np.complex64, np.complex128]:
x = util.dirac([1, 3], device=device, dtype=dtype)
W = xp.ones([1, 3], dtype=dtype)
y = backend.to_device(conv.convolve(
x, W, mode=mode), backend.cpu_device)
npt.assert_allclose(y, [[1]], atol=1e-5)
x = util.dirac([1, 3], device=device, dtype=dtype)
W = xp.ones([1, 2], dtype=dtype)
y = backend.to_device(conv.convolve(
mag = abs_input - lamda
mag = (abs(mag) + mag) / 2
return mag * sign
@nb.vectorize # pragma: no cover
def _hard_thresh(lamda, input):
abs_input = abs(input)
if abs_input > lamda:
return input
else:
return 0
if config.cupy_enabled: # pragma: no cover
import cupy as cp
_soft_thresh_cuda = cp.ElementwiseKernel(
'S lamda, T input',
'T output',
"""
S abs_input = abs(input);
T sign;
if (abs_input == 0)
sign = 0;
else
sign = input / (T) abs_input;
S mag = abs_input - lamda;
mag = (abs(mag) + mag) / 2.;
output = (T) mag * sign;
Returns:
array: data array of shape
:math:`[..., m_1, ..., m_D]` if multi_channel is False,
:math:`[..., c_i, m_1, ..., m_D]` otherwise.
"""
data_shape = tuple(data_shape)
xp = backend.get_array_module(output)
if xp == np:
data = _convolve_data_adjoint(output, filt, data_shape,
mode=mode, strides=strides,
multi_channel=multi_channel)
else: # pragma: no cover
if config.cudnn_enabled:
if np.issubdtype(output.dtype, np.floating):
data = _convolve_data_adjoint_cuda(
output, filt, data_shape,
mode=mode, strides=strides,
multi_channel=multi_channel)
else:
data = _complex(_convolve_data_adjoint_cuda,
output, filt.conj(), data_shape,
mode=mode, strides=strides,
multi_channel=multi_channel)
else:
raise RuntimeError(
'cudnn must be installed to perform convolution on GPU.')
return data
mode (str): {'full', 'valid'}.
strides (None or tuple of ints): convolution strides of length D.
multi_channel (bool): specify if input/output has multiple channels.
Returns:
array: output array of shape:
:math:`[..., p_1, ..., p_D]` if multi_channel is False,
:math:`[..., c_o, p_1, ..., p_D]` otherwise.
"""
xp = backend.get_array_module(data)
if xp == np:
output = _convolve(data, filt, mode=mode, strides=strides,
multi_channel=multi_channel)
else: # pragma: no cover
if config.cudnn_enabled:
if np.issubdtype(data.dtype, np.floating):
output = _convolve_cuda(data, filt,
mode=mode, strides=strides,
multi_channel=multi_channel)
else:
output = _complex(_convolve_cuda, data, filt,
mode=mode, strides=strides,
multi_channel=multi_channel)
else:
raise RuntimeError(
'cudnn must be installed to perform convolution on GPU.')
return output
# -*- coding: utf-8 -*-
"""FFT functions.
This module contains FFT functions that support centered operation.
"""
import numpy as np
from sigpy import backend, config, util
if config.cupy_enabled:
import cupy as cp
def fft(input, oshape=None, axes=None, center=True, norm='ortho'):
"""FFT function that supports centering.
Args:
input (array): input array.
oshape (None or array of ints): output shape.
axes (None or array of ints): Axes over which to compute the FFT.
norm (Nonr or ``"ortho"``): Keyword to specify the normalization mode.
Returns:
array: FFT result of dimension oshape.
See Also: