Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
arr[len(loaded):len(loaded)+10, :] = ones
# Test rewriting
arr[3:13, :] = ones
arr.close()
loaded = np.load(filename)
assert np.array_equal(np.r_[original, ones, append2], loaded)
# Test pickling
arr = NpyArray(filename)
serialized = pickle.dumps(arr)
arr = pickle.loads(serialized)
assert np.array_equal(np.r_[original, ones, append2], arr[:])
# Test truncate method
arr = NpyArray(filename)
arr.truncate(len(original))
assert np.array_equal(original, arr[:])
arr.close()
loaded = np.load(filename)
assert np.array_equal(original, loaded)
# Try that truncation in initialization works
arr = NpyArray(filename, truncate=True)
arr.append(append)
arr.close()
loaded = np.load(filename)
assert np.array_equal(append, loaded)
os.remove(filename)
# Test closing and opening the pool
pool.close()
pool = ArrayPool.open(pool.name)
assert len(pool) == total/bs
pool.close()
# Test opening from a moved location
os.rename(pool.path, pool.path + '_move')
pool = ArrayPool.open(pool.name + '_move')
assert len(pool) == total/bs
assert np.array_equal(pool[2]['S1'], batch2['S1'])
# Test adding a random .npy file
r = np.random.rand(3*bs)
newfile = os.path.join(pool.path, 'test.npy')
arr = NpyArray(newfile, r)
pool.add_store('test', ArrayStore(arr, bs))
assert len(pool.get_store('test')) == 3
assert np.array_equal(pool[2]['test'], r[-bs:])
# Test removing the pool
pool.delete()
assert not os.path.exists(pool.path)
# Remove the pool container folder
os.rmdir(pool.prefix)
filename = 'test.npy'
original = np.random.rand(3, 2)
append = np.random.rand(10, 2)
ones = np.ones((10,2))
append2 = np.random.rand(23, 2)
arr = NpyArray(filename, truncate=True)
arr.append(original)
assert np.array_equal(original, arr[:])
arr.close()
loaded = np.load(filename)
assert np.array_equal(original, loaded)
# Test appending and reading
arr = NpyArray(filename)
arr.append(append)
arr.flush()
loaded = np.load(filename)
assert np.array_equal(np.r_[original, append], loaded)
# Test further appending
arr.append(append2)
assert np.array_equal(np.r_[original, append, append2], arr[:])
arr.flush()
loaded = np.load(filename)
assert np.array_equal(np.r_[original, append, append2], loaded)
# Test that writing over the array fails
with pytest.raises(Exception):
arr[len(loaded):len(loaded)+10, :] = ones
def test_npy_array_multiple_instances():
original = np.random.rand(3, 2)
append = np.random.rand(10, 2)
append_clone = np.random.rand(10, 2)
filename = 'test.npy'
# Test appending and reading
arr = NpyArray(filename, array=original)
arr.flush()
arr.append(append)
assert(len(arr) == 13)
arr.fs.flush()
# Make a second instance and a simultaneous append
arr_clone = NpyArray(filename)
arr_clone.append(append_clone)
assert len(arr_clone) == 13
assert np.array_equal(arr_clone[:], np.r_[original, append_clone])
arr.close()
arr_clone.close()
os.remove(filename)
# Test pickling
arr = NpyArray(filename)
serialized = pickle.dumps(arr)
arr = pickle.loads(serialized)
assert np.array_equal(np.r_[original, ones, append2], arr[:])
# Test truncate method
arr = NpyArray(filename)
arr.truncate(len(original))
assert np.array_equal(original, arr[:])
arr.close()
loaded = np.load(filename)
assert np.array_equal(original, loaded)
# Try that truncation in initialization works
arr = NpyArray(filename, truncate=True)
arr.append(append)
arr.close()
loaded = np.load(filename)
assert np.array_equal(append, loaded)
os.remove(filename)
def test_npy_store():
filename = 'test'
arr = np.random.rand(40,2)
NpyArray(filename, arr).close()
store = NpyStore(filename, batch_size=10, n_batches=4)
run_basic_store_tests(store, arr)
batch = np.random.rand(10, 2)
store[4] = batch
store[5] = 2*batch
assert np.array_equal(store[5], 2*batch)
with pytest.raises(IndexError):
store[7] = 3*batch
store.delete()
arr.flush()
loaded = np.load(filename)
assert np.array_equal(np.r_[original, append, append2], loaded)
# Test that writing over the array fails
with pytest.raises(Exception):
arr[len(loaded):len(loaded)+10, :] = ones
# Test rewriting
arr[3:13, :] = ones
arr.close()
loaded = np.load(filename)
assert np.array_equal(np.r_[original, ones, append2], loaded)
# Test pickling
arr = NpyArray(filename)
serialized = pickle.dumps(arr)
arr = pickle.loads(serialized)
assert np.array_equal(np.r_[original, ones, append2], arr[:])
# Test truncate method
arr = NpyArray(filename)
arr.truncate(len(original))
assert np.array_equal(original, arr[:])
arr.close()
loaded = np.load(filename)
assert np.array_equal(original, loaded)
# Try that truncation in initialization works
arr = NpyArray(filename, truncate=True)
arr.append(append)
arr.close()
original = np.random.rand(3, 2)
append = np.random.rand(10, 2)
append_clone = np.random.rand(10, 2)
filename = 'test.npy'
# Test appending and reading
arr = NpyArray(filename, array=original)
arr.flush()
arr.append(append)
assert(len(arr) == 13)
arr.fs.flush()
# Make a second instance and a simultaneous append
arr_clone = NpyArray(filename)
arr_clone.append(append_clone)
assert len(arr_clone) == 13
assert np.array_equal(arr_clone[:], np.r_[original, append_clone])
arr.close()
arr_clone.close()
os.remove(filename)
def __init__(self, file, batch_size, n_batches=-1):
"""Initialize NpyStore.
Parameters
----------
file : NpyArray or str
NpyArray object or path to the .npy file
batch_size
n_batches : int, optional
How many batches to make available from the file. Default -1 indicates that
all available batches.
"""
array = file if isinstance(file, NpyArray) else NpyArray(file)
super(NpyStore, self).__init__(array, batch_size, n_batches)