Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Examples
--------
>>> tf = ColorTransferFunction( (-10.0, -5.0) )
>>> tf.sample_colormap(-7.0, 0.01, colormap='arbre')
"""
v = np.float64(v)
if col_bounds is None:
rel = (v - self.x_bounds[0])/(self.x_bounds[1] - self.x_bounds[0])
else:
rel = (v - col_bounds[0])/(col_bounds[1] - col_bounds[0])
cmap = get_cmap(colormap)
r,g,b,a = cmap(rel)
if alpha is None: alpha = a
self.add_gaussian(v, w, [r, g, b, alpha])
mylog.debug("Adding gaussian at %s with width %s and colors %s" % (
v, w, (r,g,b,alpha)))
from yt.utilities.parallel_tools.parallel_analysis_interface import \
enable_parallelism
parallel_capable = enable_parallelism()
return parallel_capable
# This fallback is for Paraview:
# We use two signals, SIGUSR1 and SIGUSR2. In a non-threaded environment,
# we set up handlers to process these by printing the current stack and to
# raise a RuntimeError. The latter can be used, inside pdb, to catch an error
# and then examine the current stack.
try:
signal.signal(signal.SIGUSR1, signal_print_traceback)
mylog.debug("SIGUSR1 registered for traceback printing")
signal.signal(signal.SIGUSR2, signal_ipython)
mylog.debug("SIGUSR2 registered for IPython Insertion")
except (ValueError, RuntimeError, AttributeError) as e: # Not in main thread
pass
class SetExceptionHandling(argparse.Action):
def __call__(self, parser, namespace, values, option_string = None):
# If we recognize one of the arguments on the command line as indicating a
# different mechanism for handling tracebacks, we attach one of those handlers
# and remove the argument from sys.argv.
#
if self.dest == "paste":
sys.excepthook = paste_traceback
mylog.debug("Enabling traceback pasting")
elif self.dest == "paste-detailed":
sys.excepthook = paste_traceback_detailed
mylog.debug("Enabling detailed traceback pasting")
elif self.dest == "detailed":
def create_field_info(self):
self.field_dependencies = {}
self.derived_field_list = []
self.filtered_particle_types = []
self.field_info = self._field_info_class(self, self.field_list)
self.coordinates.setup_fields(self.field_info)
self.field_info.setup_fluid_fields()
for ptype in self.particle_types:
self.field_info.setup_particle_fields(ptype)
self.field_info.setup_fluid_index_fields()
if "all" not in self.particle_types:
mylog.debug("Creating Particle Union 'all'")
pu = ParticleUnion("all", list(self.particle_types_raw))
nfields = self.add_particle_union(pu)
if nfields == 0:
mylog.debug("zero common fields: skipping particle union 'all'")
self.field_info.setup_extra_union_fields()
mylog.debug("Loading field plugins.")
self.field_info.load_all_plugins()
deps, unloaded = self.field_info.check_derived_fields()
self.field_dependencies.update(deps)
self.fields = FieldTypeContainer(self)
self.index.field_list = sorted(self.field_list)
def _reconstruct_parent_child(self):
mask = np.empty(len(self.grids), dtype='int32')
mylog.debug("First pass; identifying child grids")
for i, grid in enumerate(self.grids):
get_box_grids_level(self.grid_left_edge[i,:],
self.grid_right_edge[i,:],
self.grid_levels[i] + 1,
self.grid_left_edge, self.grid_right_edge,
self.grid_levels, mask)
grid.Children = [g for g in self.grids[mask.astype("bool")] if g.Level == grid.Level + 1]
mylog.debug("Second pass; identifying parents")
for i, grid in enumerate(self.grids): # Second pass
for child in grid.Children:
child.Parent.append(grid)
def receive_and_reduce(comm, incoming_rank, image, add_to_front):
mylog.debug('Receiving image from %04i' % incoming_rank)
#mylog.debug( '%04i receiving image from %04i'%(self.comm.rank,back.owner))
arr2 = comm.recv_array(incoming_rank, incoming_rank).reshape(
(image.shape[0], image.shape[1], image.shape[2]))
if add_to_front:
front = arr2
back = image
else:
front = image
back = arr2
if image.shape[2] == 3:
# Assume Projection Camera, Add
np.add(image, front, image)
return image
if nvar > 11:
fields = ["Density",
"x-velocity", "y-velocity", "z-velocity",
"x-Bfield-left", "y-Bfield-left", "z-Bfield-left",
"x-Bfield-right", "y-Bfield-right", "z-Bfield-right",
"Pressure", "Metallicity"]
mylog.debug("No fields specified by user; automatically setting fields array to %s"
% str(fields))
# Allow some wiggle room for users to add too many variables
count_extra = 0
while len(fields) < nvar:
fields.append("var"+str(len(fields)))
count_extra += 1
if count_extra > 0:
mylog.debug('Detected %s extra fluid fields.' % count_extra)
cls.field_list = [(cls.ftype, e) for e in fields]
cls.set_detected_fields(ds, fields)
return fields
coords = np.zeros((nxm, nym, nzm, 3), dtype="float64", order="C")
coords[:,:,:,0] = x[:,None,None]
coords[:,:,:,1] = y[None,:,None]
coords[:,:,:,2] = z[None,None,:]
coords.shape = (nxm * nym * nzm, 3)
cycle = np.rollaxis(np.indices((nxm-1,nym-1,nzm-1)), 0, 4)
cycle.shape = ((nxm-1)*(nym-1)*(nzm-1), 3)
off = _cis + cycle[:, np.newaxis]
connectivity = ((off[:,:,0] * nym) + off[:,:,1]) * nzm + off[:,:,2]
mesh = AthenaPPLogarithmicMesh(i, self.index_filename, connectivity,
coords, self, bc[i],
np.array([nxm-1, nym-1, nzm-1]))
self.meshes.append(mesh)
pbar.update(i)
pbar.finish()
mylog.debug("Done setting up meshes.")
def __init__(cls, name, b, d):
type.__init__(cls, name, b, d)
code_name = name[:name.find('Simulation')]
if code_name:
simulation_time_series_registry[code_name] = cls
mylog.debug("Registering simulation: %s as %s", code_name, cls)
def find_min(self, field):
"""
Returns (value, location) for the minimum of a given field.
"""
mylog.debug("Searching for minimum value of %s", field)
source = self.all_data()
min_val, mx, my, mz = \
source.quantities.min_location(field)
center = self.arr([mx, my, mz], dtype="float64").to('code_length')
mylog.info("Min Value is %0.5e at %0.16f %0.16f %0.16f",
min_val, center[0], center[1], center[2])
return min_val, center
def send_to_parent(comm, outgoing_rank, image):
mylog.debug('Sending image to %04i' % outgoing_rank)
comm.send_array(image, outgoing_rank, tag=comm.rank)