How to use the dockerflow.checks.Info function in dockerflow

To help you get started, we’ve selected a few dockerflow examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github mozilla / gcp-ingestion / ingestion-edge / ingestion_edge / dockerflow.py View on Github external
def check_queue_size(q: SQLiteAckQueue):
    """Check queue size."""
    try:
        if q is not None:
            if q.size > 0:
                return [
                    checks.Info(
                        "queue contains pending messages", id=QUEUE_PENDING_INFO_ID
                    )
                ]
            elif q.unack_count() > 0:
                return [
                    checks.Info(
                        "queue contains unacked messages", id=QUEUE_UNACKED_INFO_ID
                    )
                ]
    except Exception:
        return [checks.Error("queue raised exception on access", id=QUEUE_ERROR_ID)]
    else:
        return []
github mozilla / gcp-ingestion / ingestion-edge / ingestion_edge / dockerflow.py View on Github external
def check_queue_size(q: SQLiteAckQueue):
    """Check queue size."""
    try:
        if q is not None:
            if q.size > 0:
                return [
                    checks.Info(
                        "queue contains pending messages", id=QUEUE_PENDING_INFO_ID
                    )
                ]
            elif q.unack_count() > 0:
                return [
                    checks.Info(
                        "queue contains unacked messages", id=QUEUE_UNACKED_INFO_ID
                    )
                ]
    except Exception:
        return [checks.Error("queue raised exception on access", id=QUEUE_ERROR_ID)]
    else:
        return []