Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test8(x):
# max-pooling
y = reduce(x, 'b c (h h1) (w w1) -> b c h w', reduction='max', h1=2, w1=2)
assert y.shape == (10, 20, 30 // 2, 40 // 2)
return y
dtype = 'int64'
coincide = numpy.array_equal
if reduction in ['mean', 'prod']:
dtype = 'float64'
coincide = numpy.allclose
for n_axes in range(6 if 'mxnet' in backend.framework_name else 11):
shape = numpy.random.randint(2, 4, size=n_axes)
permutation = numpy.random.permutation(n_axes)
skipped = 0 if reduction == 'rearrange' else numpy.random.randint(n_axes + 1)
left = ' '.join('x' + str(i) for i in range(n_axes))
right = ' '.join('x' + str(i) for i in permutation[skipped:])
pattern = left + '->' + right
x = numpy.arange(1, 1 + numpy.prod(shape), dtype=dtype).reshape(shape)
if reduction == 'prod':
x /= x.mean()
result1 = reduce(x, pattern, reduction=reduction)
result2 = x.transpose(permutation)
if skipped > 0:
result2 = getattr(result2, reduction)(axis=tuple(range(skipped)))
assert coincide(result1, result2)
if n_axes == 0 and 'mxnet' in backend.framework_name:
# known mxnet bug, cant attach gradients to scalar
continue
check_op_against_numpy(backend, x, pattern, reduction=reduction, axes_lengths={}, is_symbolic=False)
def test_gradients_imperatives():
# lazy - just checking reductions
for reduction in _reductions:
x = numpy.arange(1, 1 + 2 * 3 * 4).reshape(2, 3, 4).astype('float32')
results = {}
for backend in imp_op_backends:
y0 = backend.from_numpy(x)
if not hasattr(y0, 'grad'):
continue
if 'mxnet' in backend.framework_name:
backend.mx.autograd.set_recording(True)
y1 = reduce(y0, 'a b c -> c a', reduction=reduction)
y2 = reduce(y1, 'c a -> a c', reduction=reduction)
y3 = reduce(y2, 'a (c1 c2) -> a', reduction=reduction, c1=2)
y4 = reduce(y3, '... -> ', reduction=reduction)
if 'mxnet' in backend.framework_name:
backend.mx.autograd.set_recording(False)
y4.backward()
grad = backend.to_numpy(y0.grad)
results[backend.framework_name] = grad
print('comparing gradients for', results.keys())
for name1, grad1 in results.items():
for name2, grad2 in results.items():
assert numpy.allclose(grad1, grad2), [name1, name2, 'provided different gradients']
def operation(x):
if reduction == 'rearrange':
return rearrange(x, pattern, **axes_lengths)
else:
return reduce(x, pattern, reduction, **axes_lengths)
def test_gradients_imperatives():
# lazy - just checking reductions
for reduction in _reductions:
x = numpy.arange(1, 1 + 2 * 3 * 4).reshape(2, 3, 4).astype('float32')
results = {}
for backend in imp_op_backends:
y0 = backend.from_numpy(x)
if not hasattr(y0, 'grad'):
continue
if 'mxnet' in backend.framework_name:
backend.mx.autograd.set_recording(True)
y1 = reduce(y0, 'a b c -> c a', reduction=reduction)
y2 = reduce(y1, 'c a -> a c', reduction=reduction)
y3 = reduce(y2, 'a (c1 c2) -> a', reduction=reduction, c1=2)
y4 = reduce(y3, '... -> ', reduction=reduction)
if 'mxnet' in backend.framework_name:
backend.mx.autograd.set_recording(False)
y4.backward()
grad = backend.to_numpy(y0.grad)
results[backend.framework_name] = grad
print('comparing gradients for', results.keys())
for name1, grad1 in results.items():
for name2, grad2 in results.items():
assert numpy.allclose(grad1, grad2), [name1, name2, 'provided different gradients']
getattr(input, reduction)(axis=(1, 3)).transpose(2, 1, 0).reshape(-1, 2)],
['a b c d e ... -> (e c) a', {},
getattr(input, reduction)(axis=(1, 3)).transpose(2, 1, 0).reshape(-1, 2)],
['a b c d e -> (e c a)', {},
getattr(input, reduction)(axis=(1, 3)).transpose(2, 1, 0).reshape(-1)],
['(a a2) ... -> (a2 a) ...', dict(a2=1),
input],
]
for pattern, axes_lengths, expected_result in test_cases:
shapes = [input.shape]
if backend.framework_name != 'mxnet.symbol':
# mxnet can't handle non-specified shapes
shapes.append([None for _ in input.shape])
for shape in shapes:
sym = backend.create_symbol(shape)
result_sym = reduce(sym, pattern, reduction=reduction, **axes_lengths)
result = backend.eval_symbol(result_sym, [(sym, input)])
assert numpy.allclose(result, expected_result)
if True:
shape = []
_axes_lengths = {**axes_lengths}
for axis, length in zip('abcde', input.shape):
# filling as much as possible with Nones
if axis in pattern:
shape.append(None)
_axes_lengths[axis] = length
else:
shape.append(length)
sym = backend.create_symbol(shape)
result_sym = reduce(sym, pattern, reduction=reduction, **_axes_lengths)
result = backend.eval_symbol(result_sym, [(sym, input)])
def test_ellipsis_ops_numpy():
x = numpy.arange(2 * 3 * 4 * 5 * 6).reshape([2, 3, 4, 5, 6])
for pattern in identity_patterns:
assert numpy.array_equal(x, rearrange(x, pattern)), pattern
for pattern1, pattern2 in equivalent_rearrange_patterns:
assert numpy.array_equal(rearrange(x, pattern1), rearrange(x, pattern2))
for reduction in ['min', 'max', 'sum']:
for pattern1, pattern2 in equivalent_reduction_patterns:
assert numpy.array_equal(reduce(x, pattern1, reduction=reduction),
reduce(x, pattern2, reduction=reduction))
# now just check coincidence with numpy
all_rearrange_patterns = [*identity_patterns]
for pattern_pairs in equivalent_rearrange_patterns:
all_rearrange_patterns.extend(pattern_pairs)
def test_ellipsis_ops_numpy():
x = numpy.arange(2 * 3 * 4 * 5 * 6).reshape([2, 3, 4, 5, 6])
for pattern in identity_patterns:
assert numpy.array_equal(x, rearrange(x, pattern)), pattern
for pattern1, pattern2 in equivalent_rearrange_patterns:
assert numpy.array_equal(rearrange(x, pattern1), rearrange(x, pattern2))
for reduction in ['min', 'max', 'sum']:
for pattern1, pattern2 in equivalent_reduction_patterns:
assert numpy.array_equal(reduce(x, pattern1, reduction=reduction),
reduce(x, pattern2, reduction=reduction))
# now just check coincidence with numpy
all_rearrange_patterns = [*identity_patterns]
for pattern_pairs in equivalent_rearrange_patterns:
all_rearrange_patterns.extend(pattern_pairs)
def test_gradients_imperatives():
# lazy - just checking reductions
for reduction in _reductions:
x = numpy.arange(1, 1 + 2 * 3 * 4).reshape(2, 3, 4).astype('float32')
results = {}
for backend in imp_op_backends:
y0 = backend.from_numpy(x)
if not hasattr(y0, 'grad'):
continue
if 'mxnet' in backend.framework_name:
backend.mx.autograd.set_recording(True)
y1 = reduce(y0, 'a b c -> c a', reduction=reduction)
y2 = reduce(y1, 'c a -> a c', reduction=reduction)
y3 = reduce(y2, 'a (c1 c2) -> a', reduction=reduction, c1=2)
y4 = reduce(y3, '... -> ', reduction=reduction)
if 'mxnet' in backend.framework_name:
backend.mx.autograd.set_recording(False)
y4.backward()
grad = backend.to_numpy(y0.grad)
results[backend.framework_name] = grad
print('comparing gradients for', results.keys())
for name1, grad1 in results.items():
for name2, grad2 in results.items():
assert numpy.allclose(grad1, grad2), [name1, name2, 'provided different gradients']