Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import logging
from functools import partial
from typing import Callable, Type, TypeVar
from yarl import URL
import aiormq
from aiormq.tools import censor_url
from .channel import Channel
from .tools import CallbackCollection
from .types import TimeoutType
try:
from yarl import DEFAULT_PORTS
DEFAULT_PORTS['amqp'] = 5672
DEFAULT_PORTS['amqps'] = 5671
except ImportError:
pass
log = logging.getLogger(__name__)
class Connection:
""" Connection abstraction """
CHANNEL_CLASS = Channel
KWARGS_TYPES = ()
@property
def is_closed(self):
from .base import Base, task
from .channel import Channel
from .tools import censor_url
from .types import ArgumentsType, SSLCerts, URLorStr
from .version import __version__
log = logging.getLogger(__name__)
CHANNEL_CLOSE_RESPONSES = (spec.Channel.Close, spec.Channel.CloseOk)
try:
from yarl import DEFAULT_PORTS
DEFAULT_PORTS["amqp"] = 5672
DEFAULT_PORTS["amqps"] = 5671
except ImportError:
pass
PRODUCT = "aiormq"
PLATFORM = "{} {} ({} build {})".format(
platform.python_implementation(),
platform.python_version(),
*platform.python_build()
)
def parse_bool(v: str):
return v == "1" or v.lower() in ("true", "yes", "y", "enable", "on")