How to use the contextlib2.contextmanager function in contextlib2

To help you get started, we’ve selected a few contextlib2 examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github apache / incubator-superset / superset / sql_lab.py View on Github external
@contextmanager
def session_scope(nullpool):
    """Provide a transactional scope around a series of operations."""
    if nullpool:
        engine = sqlalchemy.create_engine(
            app.config["SQLALCHEMY_DATABASE_URI"], poolclass=NullPool
        )
        session_class = sessionmaker()
        session_class.configure(bind=engine)
        session = session_class()
    else:
        session = db.session()
        session.commit()  # HACK

    try:
        yield session
        session.commit()
github broadinstitute / viral-ngs / tools / cromwell.py View on Github external
    @contextlib.contextmanager
    def cromwell_server(self, port=8000, check_health=True):
        """Start a cromwell server, shut it down when context ends."""
        with util.file.tempfname(suffix='.cromwell.conf') as cromwell_conf:
            util.file.dump_file(cromwell_conf, 'include required(classpath("application"))\nwebservice.port = {}\n'.format(port))
            _log.info('cromwell config file: %s', util.file.slurp_file(cromwell_conf))
            server = self.CromwellServer(cromwell_tool=self, port=port, config_file=cromwell_conf)
            _log.info('Waiting for cromwell server to start up...')
            time.sleep(10)
            _log.info('IN CROMWELL, AUTH IS %s', server.auth)
            os.system('pstree')
            util.misc.chk(not check_health or server.is_healthy())
            try:
                yield server
            finally:
                server.shutdown()
    # end: def cromwell_server(self, port=8000, check_health=True)
github biocore / scikit-bio / skbio / io / util.py View on Github external
@contextmanager
def _resolve_file(file, **kwargs):
    file, source, is_binary_file = _resolve(file, **kwargs)
    try:
        yield file, source, is_binary_file
    finally:
        if source.closeable:
            file.close()
github uber / tchannel-python / tchannel / tornado / timeout.py View on Github external
@contextlib2.contextmanager
def timeout(future, seconds=0, io_loop=None):
    # TODO: This is probably too heavy to attach to every request, should do
    # this in the background.
    io_loop = io_loop or tornado.ioloop.IOLoop.current()

    def raise_timeout(*args, **kwargs):
        if future.running():
            future.set_exception(TimeoutError())
    if not seconds:
        # No timeout if seconds is set to 0.
        yield
    else:
        io_loop.call_later(seconds, raise_timeout)
        yield
github nasimrahaman / antipasti-tf / Antipasti / utilities / utils.py View on Github external
    @contextmanager
    def manage(self, mode=None, **kwargs):
        if mode not in ['initialize', 'feedforward', None]:
            raise ValueError("Keyword `mode` must either be 'initialize' or 'feedforward'. Given: {}.".format(mode))

        assert not all([_csm is None for _csm in [self.initialize_csm, self.feedforward_csm]]), \
            "No context supermanager defined. Define either LayerContextSupermanagers.initialize_csm " \
            "or LayerContextSupermanagers.feedforward_csm before calling this method."

        # Find the right csm
        if mode == 'initialize':
            csm = self.initialize_csm
        elif mode == 'feedforward':
            csm = self.feedforward_csm
        else:
            if None not in [self.initialize_csm, self.feedforward_csm]:
                # None of the CSMs is None, so pick the default
github globrepo / ontology-alchemy / ontology_alchemy / session.py View on Github external
@contextmanager
def session_context():
    session = Session()
    Session.stack.append(session)
    yield session
    Session.stack.pop()
github broadinstitute / viral-ngs / tools / git_annex.py View on Github external
    @contextlib.contextmanager
    def maybe_now(self, now=True):
        """Start a new batching context if `now` is True, else do nothing to inherit an existing batching context."""
        if now:
            with self.batching() as self:
                yield self
        else:
            yield self
github derpston / python-graphiteudp / src / graphiteudp.py View on Github external
@contextmanager
def measure(metric, measure_func=time.time, reverse=False):
   """
   Function to allow usage of graphiteudp as both a context manager
   and a decorator. Takes an optional measure_func argument which is
   called before and after to obtain the `value` thats passed to
   graphiteudp.send. Defaults to time.

   Args:
      metric: The metric path to use.
      measure_func: The function thats called on entry and exit of the
         context or before and after a decorated function when used as
         a decorator.
      reverse: Reverse the order of arguments used when obtainning
         the difference.

   Examples:
github nasimrahaman / antipasti-tf / Antipasti / backend / core.py View on Github external
    @contextmanager
    def manage(self, parameter_tag=None, layer_id=None, device=None, variable_scope=None,
               name_scope=None, other_context_managers=None, reuse=None,
               reuse_layer_variable_scope=None, reuse_variable_scope=None):

        with ExitStack() as stack:
            # We're gonna store all mangers yields in an ordered dict, which will in turn be (indirectly) yielded by
            # this manager (indirectly because this manager yields self for full access, and the manager_yields
            # ordered dict is assigned as the attribute 'scope_yields').
            _manager_yields = OrderedDict([])
            for manager_group, managers in self.get_managers(parameter_tag=parameter_tag, layer_id=layer_id,
                                                             device=device, variable_scope=variable_scope,
                                                             name_scope=name_scope,
                                                             other_context_managers=other_context_managers,
                                                             reuse=reuse,
                                                             reuse_layer_variable_scope=reuse_layer_variable_scope,
                                                             reuse_variable_scope=reuse_variable_scope).items():
github google / openhtf / openhtf / exe / phase_data.py View on Github external
  @contextlib2.contextmanager
  def RecordPhaseTiming(self, phase, test_state):
    """Context manager for the execution of a single phase."""

    # Check for measurement descriptors and track them in the PhaseRecord.
    measurement_map = {
        measurement.name: copy.deepcopy(measurement)
        for measurement in phase.measurements
    }
    # Populate dummy declaration list for frontend API.
    test_state.running_phase.measurements = {
        measurement.name: measurement._asdict()
        for measurement in measurement_map.itervalues()
    }
    test_state.phase_data.measurements = (
        measurements.Collection(measurement_map))
    test_state.phase_data.attachments = {}

contextlib2

Backports and enhancements for the contextlib module

Python-2.0
Latest version published 3 years ago

Package Health Score

79 / 100
Full package analysis

Similar packages