Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create(tmp_path):
"""Test creation of GSD files."""
gsd.fl.open(mode='xb',
name=tmp_path / "test_create.gsd",
application="test_create",
schema="none",
schema_version=[1, 2])
def test_large_n(tmp_path, N):
"""Test data chunks and files larger than 2 GB."""
gc.collect()
data = numpy.linspace(0, N, num=N, endpoint=False, dtype=numpy.uint32)
with gsd.fl.open(name=tmp_path / 'test_large_N.gsd',
mode='xb',
application='test_large_N',
schema='none',
schema_version=[1, 2]) as f:
f.write_chunk(name='data', data=data)
f.end_frame()
with gsd.fl.open(name=tmp_path / 'test_large_N.gsd',
mode='rb',
application='test_large_N',
schema='none',
schema_version=[1, 2]) as f:
read_data = f.read_chunk(frame=0, name='data')
# compare the array with memory usage so this test can pass on CI
# platforms
diff = (data - read_data)
data = None
read_data = None
gc.collect()
diff = diff**2
assert numpy.sum(diff) == 0
def test_dtype(tmp_path, typ):
"""Test all supported data types."""
data1d = numpy.array([1, 2, 3, 4, 5, 10012], dtype=typ)
data2d = numpy.array([[10, 20], [30, 40], [50, 80]], dtype=typ)
data_zero = numpy.array([], dtype=typ)
gsd.fl.open(mode='xb',
name=tmp_path / "test_dtype.gsd",
application="test_dtype",
schema="none",
schema_version=[1, 2])
with gsd.fl.open(name=tmp_path / "test_dtype.gsd",
mode='wb',
application="test_dtype",
schema="none",
schema_version=[1, 2]) as f:
f.write_chunk(name='data1d', data=data1d)
f.write_chunk(name='data2d', data=data2d)
f.write_chunk(name='data_zero', data=data_zero)
f.end_frame()
with gsd.fl.open(name=tmp_path / "test_dtype.gsd",
mode='rb',
application="test_dtype",
schema="none",
schema_version=[1, 2]) as f:
read_data1d = f.read_chunk(frame=0, name='data1d')
read_data2d = f.read_chunk(frame=0, name='data2d')
def test_large_n(tmp_path, N):
"""Test data chunks and files larger than 2 GB."""
gc.collect()
data = numpy.linspace(0, N, num=N, endpoint=False, dtype=numpy.uint32)
with gsd.fl.open(name=tmp_path / 'test_large_N.gsd',
mode='xb',
application='test_large_N',
schema='none',
schema_version=[1, 2]) as f:
f.write_chunk(name='data', data=data)
f.end_frame()
with gsd.fl.open(name=tmp_path / 'test_large_N.gsd',
mode='rb',
application='test_large_N',
schema='none',
schema_version=[1, 2]) as f:
read_data = f.read_chunk(frame=0, name='data')
# compare the array with memory usage so this test can pass on CI
# platforms
'gsd.fl': sys.modules['gsd.fl'],
}
attributes = {}
if args.schema == 'hoomd':
traj = hoomd_open(args.file, mode=args.mode)
handle = traj.file
local_ns.update({
'handle': handle,
'traj': traj,
})
attributes.update({"Number of frames": len(traj)})
else:
if args.mode not in ['rb', 'rb+', 'ab']:
raise ValueError("Unsupported schema for creating a file.")
handle = fl.open(args.file, args.mode)
local_ns.update({
'handle': handle,
})
extras = "\n".join(
"{}: {}".format(key, val) for key, val in attributes.items())
code.interact(local=local_ns,
banner=SHELL_BANNER.format(python_version=sys.version,
gsd_version=__version__,
fn=args.file,
extras=extras + "\n"))
def create_file(N, size, write_keys):
""" Create the output file
"""
# size of data to read in benchmarks
bmark_read_size = 0.25 * 1024**3
timings = {}
nframes = compute_nframes(N, size, write_keys)
print(f'Writing {nframes} frames with {len(write_keys)} keys per frame')
# write the file and time how long it takes
with gsd.fl.open(name='test.gsd', mode='wb', application="My application", schema="My Schema", schema_version=[1,0]) as f:
start = time.time()
write_file(f, nframes, N, write_keys)
# ensure that all writes to disk are completed and drop file system cache
# call(['sudo', '/bin/sync'])
end = time.time()
call(['sudo', '/sbin/sysctl', 'vm.drop_caches=3'], stdout=PIPE)
print((end-start)/1e-6 / nframes / len(write_keys), "us per key")
def read_file(read_keys):
""" Run all benchmarks with the given options
"""
start = time.time()
with gsd.fl.open(name='test.gsd', mode='rb', application="My application", schema="My Schema", schema_version=[1,0]) as f:
end = time.time()
print("Open time:", (end - start)/1e-3, "ms")
nframes_read = min(f.nframes, 100)
# Read the file sequentially and measure the time taken
start = time.time()
read_sequential_file(f, read_keys, nframes_read)
end = time.time()
print("Sequential read time:", (end - start)/1e-6/nframes_read/len(read_keys), "us / key")
# # drop the file system cache
call(['sudo', '/bin/sync'])
call(['sudo', '/sbin/sysctl', 'vm.drop_caches=3'], stdout=PIPE)