How to use the cameo.config.default_view function in cameo

To help you get started, we’ve selected a few cameo examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github biosustain / cameo / tests / test_strain_design_heuristics.py View on Github external
def test_initializer(self):
        def evaluation_function(x):
            return 1

        evaluator = EvaluatorWrapper(config.default_view, evaluation_function)
        assert hasattr(evaluator, '__call__')
        assert hasattr(evaluator, 'view')
        assert hasattr(evaluator, 'evaluator')
        assert evaluator.view == config.default_view
        assert evaluator.evaluator == evaluation_function
github biosustain / cameo / tests / test_api.py View on Github external
def test_api():
    mock_host = Host('core',
                     models=['e_coli_core'],
                     biomass=['BIOMASS_Ecoli_core_w_GAM'],
                     carbon_sources=['EX_glc__D_e'])

    api.design.debug = True
    pathways = api.design.predict_pathways(product=UNIVERSALMODEL.metabolites.ser__L_c, hosts=[mock_host],
                                           database=UNIVERSALMODEL, aerobic=True)
    optimization_reports = api.design.optimize_strains(pathways, config.default_view, aerobic=True)
    pickle.loads(pickle.dumps(optimization_reports))
    assert len(optimization_reports) > 0
github biosustain / cameo / tests / test_strain_design_heuristics.py View on Github external
def test_initializer(self):
        def evaluation_function(x):
            return 1

        evaluator = EvaluatorWrapper(config.default_view, evaluation_function)
        assert hasattr(evaluator, '__call__')
        assert hasattr(evaluator, 'view')
        assert hasattr(evaluator, 'evaluator')
        assert evaluator.view == config.default_view
        assert evaluator.evaluator == evaluation_function
github biosustain / cameo / cameo / basics.py View on Github external
def flux_variability_analysis(model, reactions=None, view=config.default_view):
    """Flux-variability analysis."""
    if reactions is None:
        reactions = model.reactions
    reaction_chunks = (chunk for chunk in partition(reactions, len(view)))

    func_obj = FvaFunctionObject(model)
    chunky_results = view.map(func_obj, reaction_chunks)
    # chunky_results = view.map(lambda reactions, model=model:_flux_variability_analysis(model, reactions), reaction_chunks)
    solution = {}
    for dictionary in chunky_results:
        solution.update(dictionary)
    return solution
github biosustain / cameo / cameo / strain_design / deterministic / flux_variability_based.py View on Github external
"has zero flux in the reference state. Please choose another "
                    "one.".format(self.normalize_ranges_by)
                )

        with TimeMachine() as tm:
            # Make sure that the design_space_model is initialized to its original state later
            for variable in self.variables:
                reaction = self.design_space_model.reactions.get_by_id(variable)
                tm(do=int, undo=partial(setattr, reaction, 'lower_bound', reaction.lower_bound))
                tm(do=int, undo=partial(setattr, reaction, 'upper_bound', reaction.upper_bound))
            target_reaction = self.design_space_model.reactions.get_by_id(self.objective)
            tm(do=int, undo=partial(setattr, target_reaction, 'lower_bound', target_reaction.lower_bound))
            tm(do=int, undo=partial(setattr, target_reaction, 'upper_bound', target_reaction.upper_bound))

            if view is None:
                view = config.default_view
            else:
                view = view

            self._init_search_grid(surface_only=surface_only, improvements_only=improvements_only)

            func_obj = _DifferentialFvaEvaluator(
                self.design_space_model,
                self.variables,
                self.objective,
                self.included_reactions
            )
            if progress:
                progress = ProgressBar(len(self.grid))
                results = list(progress(view.imap(func_obj, self.grid.iterrows())))
            else:
                results = list(view.map(func_obj, self.grid.iterrows()))
github opencobra / driven / driven / flux_analysis / fit_objective / optimization.py View on Github external
    def run(self, view=config.default_view, maximize=False, **kwargs):
        res = self._heuristic_method.evolve(generator=self._generator,
                                            maximize=maximize,
                                            representation=self.candidates,
                                            bounder=zero_one_bounder,
                                            evaluator=self._evaluator,
                                            binary=self.binary,
                                            view=view,
                                            **kwargs)
        return res
github biosustain / cameo / cameo / api / designer.py View on Github external
def __call__(self, product='L-glutamate',
                 hosts=HOSTS,
                 database=None,
                 aerobic=True,
                 view=config.default_view):
        """The works.

        The following workflow will be followed to determine suitable
        metabolic engineering strategies for a desired product:

        - Determine production pathways for desired product and host organisms.
          Try a list of default hosts if no hosts are specified.
        - Determine maximum theoretical yields and production envelopes for
          all routes.
        - Determine if production routes can be coupled to growth.
        - Determine over-expression, down-regulation, and KO targets.

        Parameters
        ----------
        product : str or Metabolite
            The desired product.
github biosustain / cameo / cameo / flux_analysis / analysis.py View on Github external
If not None, fix the total sum of reaction fluxes to its minimum times a factor. Expected to be within [
        1;inf]. Higher factors increase flux variability to a certain point since the bound for the objective is
        still fixed.
    remove_cycles : bool
        If true, apply the CycleFreeFlux algorithm to remove loops from each simulated flux distribution.
    view: cameo.parallel.SequentialView or cameo.parallel.MultiprocessingView or ipython.cluster.DirectView
        A parallelization view.

    Returns
    -------
    pandas.DataFrame
        Pandas DataFrame containing the results of the flux variability analysis.

    """
    if view is None:
        view = config.default_view
    if reactions is None:
        reactions = model.reactions
    with model:
        if fraction_of_optimum > 0.:
            fix_objective_as_constraint(model, fraction=fraction_of_optimum)
        if pfba_factor is not None:
            # don't add the objective-constraint again so fraction_of_optimum=0
            fix_pfba_as_constraint(model, multiplier=pfba_factor, fraction_of_optimum=0)
        reaction_chunks = (chunk for chunk in partition(reactions, len(view)))
        if remove_cycles:
            func_obj = _FvaFunctionObject(model, _cycle_free_fva)
        else:
            func_obj = _FvaFunctionObject(model, _flux_variability_analysis)
        chunky_results = view.map(func_obj, reaction_chunks)
        solution = pandas.concat(chunky_results)
github biosustain / cameo / cameo / strain_design / heuristic / __init__.py View on Github external
    def run(self, view=config.default_view, **kwargs):
        return self.heuristic_method.evolve(
            generator=self._generator,
            maximize=True,
            view=view,
            evaluator=self._evaluator,
            **kwargs)
github biosustain / cameo / cameo / strain_design / heuristic / multiprocess / __init__.py View on Github external
    def run(self, view=config.default_view, **run_kwargs):
        run_kwargs['view'] = parallel.SequentialView()
        runner = MultiprocessRunner(self._island_class, self._init_kwargs(), self.migrator, run_kwargs)
        clients = [[o.clients[i] for o in self.observers] for i in xrange(len(view))]
        results = view.map(runner, clients)
        return results