Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from plone.app.portlets.portlets import base
from plone.portlets.interfaces import IPortletDataProvider
from zope.interface import implementer
import logging
import warnings
logging.captureWarnings(True)
class ILatestSectionableNITFPortlet(IPortletDataProvider):
"""BBB."""
@implementer(ILatestSectionableNITFPortlet)
class Assignment(base.Assignment):
"""BBB."""
class Renderer(base.Renderer):
"""BBB."""
def render(self):
msg = (
'Use of portlet "Latest Sectionable NITF" is deprecated; '
'remove assignment manually from {0}'.format(self.context))
warnings.warn(msg, DeprecationWarning)
class AddForm(base.AddForm):
"""BBB."""
self.result.log.append((ts, self.peerIndex, sessionId, msg.encode('utf8')))
return ts
def clientConnectionLost(self, connector, reason):
reason = str(reason.value)
self.log("Client connection lost: %s" % reason)
self.onGone.callback(None)
def clientConnectionFailed(self, connector, reason):
reason = str(reason.value)
self.log("Client connection failed: %s" % reason)
self.onGone.callback(reason)
@implementer(ITestCase)
class WampCase:
factory = None
index = None
description = None
expectation = None
params = None
def __init__(self, testee, spec):
self.testee = testee
self.spec = spec
self._uriSuffix = '#' + str(random.randint(0, 1000000))
if self.testee.options.has_key('rtt'):
Get filename extension. Caution: some virus don't put filename
in 'Content-Disposition' header.
"""
fext = ''
filename = m.get_filename('') or m.get_param('name', '')
if filename:
fext = splitext(oneline(filename,'utf-8'))[1]
if len(fext) > 1:
fext = fext[1:]
else:
fext = ''
return fext
@implementer(IHandler)
class MIMEDelete:
"""Filter the MIME content of messages."""
name = 'mime-delete'
description = _('Filter the MIME content of messages.')
def process(self, mlist, msg, msgdata):
# Short-circuits
if not mlist.filter_content:
return
if msgdata.get('isdigest'):
return
process(mlist, msg, msgdata)
print("pausing", drain)
subPauses.append(drain.fount.pauseFlow())
print("done.")
return _AggregatePause(subPauses)
def stopFlow(self):
"""
Stop the flow of all founts flowing into L{_InDrain}s for this L{In}.
"""
for drain in self._in._drains:
drain.fount.stopFlow()
@implementer(IPause)
class _AggregatePause(object):
"""
A pause which aggregates several other pauses.
"""
def __init__(self, subPauses):
"""
Createe an L{_AggregatePause} for other pauses.
"""
self._subPauses = subPauses
def unpause(self):
"""
Un-pause all pauses composed within this L{_AggregatePause}.
"""
def record_last_sync(context):
context.last_sync = datetime.utcnow()
def get_last_sync(context):
return getattr(aq_base(context), 'last_sync', None)
class IUserProfileManager(Interface):
"""Sync properties for a user profile
"""
@adapter(IUserProfile)
@implementer(IUserProfileManager)
class UserPropertyManager(object):
def __init__(self, context):
self.context = context
self.property_mapping = api.portal.get_registry_record(
PROP_SHEET_MAP_KEY)
def turn_properties_plugin_on(self):
""" Turn on properties plugin if deactivated. Returns state.
"""
plugins = api.portal.get_tool(name='acl_users').plugins
plugin_id = api.portal.get_registry_record(PRI_EXT_USERS_KEY)
if not plugin_id or not getattr(plugins, plugin_id, None):
return
if plugin_id not in \
'api_version': self.config.get('api_version', 'v1'),
'auth_client': auth_client,
'default_zone_prefix': self.config.get('default_zone_prefix', '')
}
return gdns.GDNSClient(**kwargs)
def build_reconciler(self):
self._validate_config()
auth_client = self._init_auth()
dns_client = self._init_client(auth_client)
return GDNSReconciler(
self.config, self.metrics, dns_client, self.rrset_channel,
self.changes_channel, **self.kwargs)
@zope.interface.implementer(interfaces.IReconciler)
class GDNSReconciler:
"""Validate current records in DNS against desired source of truth.
:class:`.GDNSReconciler` will create a change message for the
configured publisher client plugin to consume if there is a
discrepancy between records in Google Cloud DNS and the desired
state.
Once validation is done, the Reconciler will emit a ``None`` message
to the ``changes_channel`` queue, signalling a Publisher client
(e.g. :class:`.GPubsubPublisher`) to publish the
message to a pub/sub to which `Gordon `_ subscribes.
Args:
config (dict): Google Cloud DNS-related configuration.
import logging
import os.path
import plone.dexterity.schema
import six
@implementer(IDexterityFTIModificationDescription)
class DexterityFTIModificationDescription(object):
def __init__(self, attribute, oldValue):
self.attribute = attribute
self.oldValue = oldValue
@implementer(IDexterityFTI)
class DexterityFTI(base.DynamicViewTypeInformation):
"""A Dexterity FTI
"""
meta_type = "Dexterity FTI"
behaviors_type = 'ulines'
if six.PY2:
behaviors_type = 'lines'
_properties = base.DynamicViewTypeInformation._properties + (
{
'id': 'add_permission',
'type': 'selection',
'select_variable': 'possiblePermissionIds',
'mode': 'w',
Set the stream used to log output from tests.
"""
self._testStream = stream
@implementer(IAddress)
class LocalWorkerAddress(object):
"""
A L{IAddress} implementation meant to provide stub addresses for
L{ITransport.getPeer} and L{ITransport.getHost}.
"""
@implementer(ITransport)
class LocalWorkerTransport(object):
"""
A stub transport implementation used to support L{AMP} over a
L{ProcessProtocol} transport.
"""
def __init__(self, transport):
self._transport = transport
def write(self, data):
"""
Forward data to transport.
"""
self._transport.writeToChild(_WORKER_AMP_STDIN, data)
import logging
from lexicon.providers import dnsmadeeasy
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins import dns_common_lexicon
logger = logging.getLogger(__name__)
ACCOUNT_URL = 'https://cp.dnsmadeeasy.com/account/info'
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for DNS Made Easy
This Authenticator uses the DNS Made Easy API to fulfill a dns-01 challenge.
"""
description = ('Obtain certificates using a DNS TXT record (if you are using DNS Made Easy for '
'DNS).')
ttl = 60
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self.credentials = None
@classmethod
# -*- coding: utf-8 -*-
from Products.CMFPlone.interfaces import IMaintenanceSchema
from Products.CMFPlone.interfaces import IPloneSiteRoot
from plone.registry.interfaces import IRegistry
from zope.component import adapts
from zope.component import getUtility
from zope.interface import implementer
@implementer(IMaintenanceSchema)
class MaintenanceControlPanelAdapter(object):
adapts(IPloneSiteRoot)
def __init__(self, context):
self.context = context
registry = getUtility(IRegistry)
self.maintenance_settings = registry.forInterface(
IMaintenanceSchema, prefix="plone")
def get_days(self):
return self.maintenance_settings.days
def set_days(self, value):
self.maintenance_settings.days = value