Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
IPython DirectView instance
a : str
String name of the remote array
bins : int
Number of histogram bins
rng : (float, float)
Tuple of min, max of the range to histogram
normed : boolean
Should the histogram counts be normalized to 1
"""
nengines = len(view.targets)
# view.push(dict(bins=bins, rng=rng))
with view.sync_imports():
import numpy
rets = view.apply_sync(lambda a, b, rng: numpy.histogram(a,b,rng), Reference(a), bins, rng)
hists = [ r[0] for r in rets ]
lower_edges = [ r[1] for r in rets ]
# view.execute('hist, lower_edges = numpy.histogram(%s, bins, rng)' % a)
lower_edges = view.pull('lower_edges', targets=0)
hist_array = numpy.array(hists).reshape(nengines, -1)
# hist_array.shape = (nengines,-1)
total_hist = numpy.sum(hist_array, 0)
if normed:
total_hist = total_hist/numpy.sum(total_hist,dtype=float)
return total_hist, lower_edges
def remote_iterator(view,name):
"""Return an iterator on an object living on a remote engine.
"""
view.execute('it%s=iter(%s)'%(name,name), block=True)
while True:
try:
result = view.apply_sync(next, ipp.Reference('it'+name))
# This causes the StopIteration exception to be raised.
except RemoteError as e:
if e.ename == 'StopIteration':
raise StopIteration
else:
raise e
else:
yield result
view['u_hist'] = []
# set vector/scalar implementation details
impl = {}
impl['ic'] = 'vectorized'
impl['inner'] = 'scalar'
impl['bc'] = 'vectorized'
# execute some files so that the classes we need will be defined on the engines:
view.run('RectPartitioner.py')
view.run('wavesolver.py')
# setup remote partitioner
# note that Reference means that the argument passed to setup_partitioner will be the
# object named 'my_id' in the engine's namespace
view.apply_sync(setup_partitioner, ipp.Reference('my_id'), num_procs, grid, partition)
# wait for initial communication to complete
view.execute('mpi.barrier()')
# setup remote solvers
view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=ipp.Reference('partitioner'), dt=0,implementation=impl)
# lambda for calling solver.solve:
_solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
if ns.scalar:
impl['inner'] = 'scalar'
# run first with element-wise Python operations for each cell
t0 = time.time()
ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
if final_test:
# this sum is performed element-wise as results finish
s = sum(ar)
# run first with element-wise Python operations for each cell
t0 = time.time()
ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
if final_test:
# this sum is performed element-wise as results finish
s = sum(ar)
# the L2 norm (RMS) of the result:
norm = sqrt(s/num_cells)
else:
norm = -1
t1 = time.time()
print('scalar inner-version, Wtime=%g, norm=%g' % (t1-t0, norm))
impl['inner'] = 'vectorized'
# setup new solvers
view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=ipp.Reference('partitioner'), dt=0,implementation=impl)
view.execute('mpi.barrier()')
# run again with numpy vectorized inner-implementation
t0 = time.time()
ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
if final_test:
# this sum is performed element-wise as results finish
s = sum(ar)
# the L2 norm (RMS) of the result:
norm = sqrt(s/num_cells)
else:
norm = -1
t1 = time.time()
print('vector inner-version, Wtime=%g, norm=%g' % (t1-t0, norm))
# if ns.save is True, then u_hist stores the history of u as a list
# create the Communicator objects on the engines
view.execute('com = BinaryTreeCommunicator(id, root = id==root_id )')
pub_url = root.apply_sync(lambda : com.pub_url)
# gather the connection information into a dict
ar = view.apply_async(lambda : com.info)
peers = ar.get_dict()
# this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
# connect the engines to each other:
def connect(com, peers, tree, pub_url, root_id):
"""this function will be called on the engines"""
com.connect(peers, tree, pub_url, root_id)
view.apply_sync(connect, ipp.Reference('com'), peers, btree, pub_url, root_id)
# functions that can be used for reductions
# max and min builtins can be used as well
def add(a,b):
"""cumulative sum reduction"""
return a+b
def mul(a,b):
"""cumulative product reduction"""
return a*b
view['add'] = add
view['mul'] = mul
# scatter some data
data = list(range(1000))
ident=None):
"""construct and send an apply message via a socket.
This is the principal method with which all engine execution is performed by views.
"""
if self._closed:
raise RuntimeError("Client cannot be used after its sockets have been closed")
# defaults:
args = args if args is not None else []
kwargs = kwargs if kwargs is not None else {}
metadata = metadata if metadata is not None else {}
# validate arguments
if not callable(f) and not isinstance(f, (Reference, PrePickled)):
raise TypeError("f must be callable, not %s"%type(f))
if not isinstance(args, (tuple, list)):
raise TypeError("args must be tuple or list, not %s"%type(args))
if not isinstance(kwargs, dict):
raise TypeError("kwargs must be dict, not %s"%type(kwargs))
if not isinstance(metadata, dict):
raise TypeError("metadata must be dict, not %s"%type(metadata))
bufs = serialize.pack_apply_message(f, args, kwargs,
buffer_threshold=self.session.buffer_threshold,
item_threshold=self.session.item_threshold,
)
future = self._send(socket, "apply_request", buffers=bufs, ident=ident,
metadata=metadata, track=track)
# scatter engine IDs
view.scatter('my_id', range(num_procs), flatten=True)
# create the engine connectors
view.execute('com = EngineCommunicator()')
# gather the connection information into a single dict
ar = view.apply_async(lambda : com.info)
peers = ar.get_dict()
# print peers
# this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
# setup remote partitioner
# note that Reference means that the argument passed to setup_partitioner will be the
# object named 'com' in the engine's namespace
view.apply_sync(setup_partitioner, ipp.Reference('com'), peers, ipp.Reference('my_id'), num_procs, grid, partition)
time.sleep(1)
# convenience lambda to call solver.solve:
_solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
if ns.scalar:
impl['inner'] = 'scalar'
# setup remote solvers
view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=ipp.Reference('partitioner'), dt=0,implementation=impl)
# run first with element-wise Python operations for each cell
t0 = time.time()
ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
if final_test:
# this sum is performed element-wise as results finish
s = sum(ar)
# the L2 norm (RMS) of the result:
t0 = time.time()
ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
if final_test:
# this sum is performed element-wise as results finish
s = sum(ar)
# the L2 norm (RMS) of the result:
norm = sqrt(s/num_cells)
else:
norm = -1
t1 = time.time()
print('scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm))
# run again with faster numpy-vectorized inner implementation:
impl['inner'] = 'vectorized'
# setup remote solvers
view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=ipp.Reference('partitioner'), dt=0,implementation=impl)
t0 = time.time()
ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
if final_test:
# this sum is performed element-wise as results finish
s = sum(ar)
# the L2 norm (RMS) of the result:
norm = sqrt(s/num_cells)
else:
norm = -1
t1 = time.time()
print('vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm))
# if ns.save is True, then u_hist stores the history of u as a list
# If the partion scheme is Nx1, then u can be reconstructed via 'gather':
def pwordfreq(view, fnames):
"""Parallel word frequency counter.
view - An IPython DirectView
fnames - The filenames containing the split data.
"""
assert len(fnames) == len(view.targets)
view.scatter('fname', fnames, flatten=True)
ar = view.apply(wordfreq, ipp.Reference('fname'))
freqs_list = ar.get()
word_set = set()
for f in freqs_list:
word_set.update(f.keys())
freqs = dict(zip(word_set, repeat(0)))
for f in freqs_list:
for word, count in f.items():
freqs[word] += count
return freqs