How to use the typing.Dict function in typing

To help you get started, we’ve selected a few typing examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github tribe29 / checkmk / tests / unit / cmk / gui / test_userdb_ldap_connector.py View on Github external
"cn": ["Härry Hörsch"],
            "samaccountname": ["härry"],
            "userPassword": ["ldap-test"],
            "mail": ["härry@check-mk.org"],
        },
        "cn=sync-user,ou=users,dc=check-mk,dc=org": {
            "objectclass": ["user"],
            "objectcategory": ["person"],
            "dn": ["cn=sync-user,ou=users,dc=check-mk,dc=org"],
            "cn": ["sync-user"],
            "samaccountname": ["sync-user"],
            "userPassword": ["sync-secret"],
        },
    }

    groups: Dict[str, Dict[str, Union[str, List[str]]]] = {
        "cn=admins,ou=groups,dc=check-mk,dc=org": {
            "objectclass": ["group"],
            "objectcategory": ["group"],
            "dn": ["cn=admins,ou=groups,dc=check-mk,dc=org"],
            "cn": ["admins"],
            "member": ["cn=admin,ou=users,dc=check-mk,dc=org",],
        },
        u"cn=älle,ou=groups,dc=check-mk,dc=org": {
            "objectclass": ["group"],
            "objectcategory": ["group"],
            "dn": [u"cn=älle,ou=groups,dc=check-mk,dc=org"],
            "cn": ["alle"],
            "member": [
                "cn=admin,ou=users,dc=check-mk,dc=org",
                "cn=härry,ou=users,dc=check-mk,dc=org",
            ],
github nuxeo / nuxeo-drive / tests / unit / test_autolock.py View on Github external
"""
from pathlib import Path
from typing import Dict, List, Tuple
from unittest.mock import Mock, patch

import pytest
import nxdrive.autolocker

from .. import ensure_no_exception


class DAO:
    """Minimal ManagerDAO for a working Auto-Lock."""

    # {path: (process, doc_id)}
    paths: Dict[str, Tuple[int, str]] = {}

    def get_locked_paths(self) -> List[str]:
        return list(self.paths.keys())

    def lock_path(self, path: str, process: int, doc_id: str) -> None:
        self.paths[path] = (process, doc_id)

    def unlock_path(self, path: str) -> None:
        self.paths.pop(path, None)


@pytest.fixture(scope="function")
def autolock(tmpdir):
    check_interval = 5
    autolocker = nxdrive.autolocker.ProcessAutoLockerWorker(
        check_interval, DAO(), Path(tmpdir)
github psf / black / blib2to3 / pgen2 / pgen.py View on Github external
for label, next in state.arcs.items():
            if label in self.dfas:
                if label in self.first:
                    fset = self.first[label]
                    if fset is None:
                        raise ValueError("recursion for rule %r" % name)
                else:
                    self.calcfirst(label)
                    fset = self.first[label]
                    assert fset is not None
                totalset.update(fset)
                overlapcheck[label] = fset
            else:
                totalset[label] = 1
                overlapcheck[label] = {label: 1}
        inverse: Dict[str, str] = {}
        for label, itsfirst in overlapcheck.items():
            for symbol in itsfirst:
                if symbol in inverse:
                    raise ValueError(
                        "rule %s is ambiguous; %s is in the"
                        " first sets of %s as well as %s"
                        % (name, symbol, label, inverse[symbol])
                    )
                inverse[symbol] = label
        self.first[name] = totalset
github microsoft / botbuilder-python / samples / 16.proactive-messages / app.py View on Github external
label="TurnError",
            name="on_turn_error Trace",
            timestamp=datetime.utcnow(),
            type=ActivityTypes.trace,
            value=f"{error}",
            value_type="https://www.botframework.com/schemas/error",
        )
        # Send a trace activity, which will be displayed in Bot Framework Emulator
        await context.send_activity(trace_activity)


ADAPTER.on_turn_error = on_error

# Create a shared dictionary.  The Bot will add conversation references when users
# join the conversation and send messages.
CONVERSATION_REFERENCES: Dict[str, ConversationReference] = dict()

# If the channel is the Emulator, and authentication is not in use, the AppId will be null.
# We generate a random AppId for this case only. This is not required for production, since
# the AppId will have a value.
APP_ID = SETTINGS.app_id if SETTINGS.app_id else uuid.uuid4()

# Create the Bot
BOT = ProactiveBot(CONVERSATION_REFERENCES)

# Listen for incoming requests on /api/messages.
@APP.route("/api/messages", methods=["POST"])
def messages():
    # Main bot message handler.
    if "application/json" in request.headers["Content-Type"]:
        body = request.json
    else:
github CoinAlpha / hummingbot / hummingbot / core / data_type / remote_api_order_book_data_source.py View on Github external
    async def get_tracking_pairs(self) -> Dict[str, OrderBookTrackerEntry]:
        auth: aiohttp.BasicAuth = aiohttp.BasicAuth(login=conf.coinalpha_order_book_api_username,
                                                    password=conf.coinalpha_order_book_api_password)
        client_session: aiohttp.ClientSession = await self.get_client_session()
        response: aiohttp.ClientResponse = await client_session.get(self.SNAPSHOT_REST_URL, auth=auth)
        timestamp: float = time.time()
        if response.status != 200:
            raise EnvironmentError(f"Error fetching order book tracker snapshot from {self.SNAPSHOT_REST_URL}.")

        binary_data: bytes = await response.read()
        order_book_tracker_data: Dict[str, Tuple[pd.DataFrame, pd.DataFrame]] = pickle.loads(binary_data)
        retval: Dict[str, OrderBookTrackerEntry] = {}

        for trading_pair, (bids_df, asks_df) in order_book_tracker_data.items():
            order_book: BinanceOrderBook = BinanceOrderBook()
            order_book.apply_numpy_snapshot(bids_df.values, asks_df.values)
            retval[trading_pair] = OrderBookTrackerEntry(trading_pair, timestamp, order_book)
github thu-coai / cotk / cotk / dataloader / vocab.py View on Github external
def __init__(self):
		super().__init__()
		self._setting_hash = hashlib.sha256(
			dumps([self.__class__.__name__, "configs"])
		).hexdigest()
		self._token_counter = Counter()
		self._all_vocab_list: List[str] = None
		self.word2id: Dict[str, int] = None
		self.mode = "init"
github wtsi-hgi / python-hgijson / hgijson / custom_types.py View on Github external
from typing import TypeVar, Dict, List, Tuple, Union

SerializableType = TypeVar("Serializable")

# TODO: Why is this needed?
PrimitiveUnionType = TypeVar("PrimitiveUnion")

PrimitiveJsonType = Union[Dict, List, Tuple, str, int, float, bool, None]
github home-assistant / home-assistant-cli / homeassistant_cli / plugins / raw.py View on Github external
WSTYPE is name of websocket methods.

    \b
    --json is dictionary to pass in addition to the type.
           Example: --json='{ "area_id":"2c8bf93c8082492f99c989896962f207" }'
    """
    if json:
        data = json_.loads(json)
    else:
        data = {}

    frame = {'type': wstype}
    frame = {**frame, **data}  # merging data into frame

    response = cast(List[Dict[str, Any]], api.wsapi(ctx, frame))

    ctx.echo(format_output(ctx, response))
github mozilla / glean_parser / glean_parser / markdown.py View on Github external
#
    # {
    #  "baseline": [
    #    { ... metric data ... },
    #    ...
    #  ],
    #  "metrics": [
    #    { ... metric data ... },
    #    ...
    #  ],
    #  ...
    # }
    #
    # This also builds a dictionary of custom pings, if available.
    custom_pings_cache: Dict[str, pings.Ping] = defaultdict()
    metrics_by_pings: Dict[str, List[metrics.Metric]] = defaultdict(list)
    for category_key, category_val in objs.items():
        for obj in category_val.values():
            # Filter out custom pings. We will need them for extracting
            # the description
            if isinstance(obj, pings.Ping):
                custom_pings_cache[obj.name] = obj
                # Pings that have `send_if_empty` set to true,
                # might not have any metrics. They need to at least have an
                # empty array of metrics to show up on the template.
                if obj.send_if_empty and not metrics_by_pings[obj.name]:
                    metrics_by_pings[obj.name] = []

            # If this is an internal Glean metric, and we don't
            # want docs for it.
            if isinstance(obj, metrics.Metric) and not obj.is_internal_metric():
                # If we get here, obj is definitely a metric we want
github mgurdal / aegis / aegis / authenticators / basic.py View on Github external
    async def authenticate(self, request: web.Request) -> Dict[str, Any]:
        """Returns JSON serializable user"""