Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, identity=None, peer_identity=None,
context=None, io_loop=None,
security_plugin='noop_auth_backend',
public_key=None, secret_key=None,
peer_public_key=None, timeout=5,
password=None,
heartbeat_plugin='noop_heartbeat_backend',
proxy_to=None,
registry=None):
self.identity = identity
self.context = context or self._make_context()
self.peer_identity = peer_identity
self.security_plugin = security_plugin
self.future_pool = {}
self.initialized = False
self.auth_backend = zope.component.getAdapter(self,
IAuthenticationBackend,
name=self.security_plugin
)
self.public_key = public_key
self.secret_key = secret_key
self.peer_public_key = peer_public_key
self.password = password
self.timeout = timeout
self.heartbeat_backend = zope.component.getAdapter(
self,
IHeartbeatBackend,
name=heartbeat_plugin)
self.proxy_to = proxy_to
self._backend_init(io_loop=io_loop)
self.reader = None
self.registry = (registry if registry is not None
sm = portal.getSiteManager()
# if not sm.queryUtility(lovely.remotetask.interfaces.ITaskService, name="FunnelwebService"):
# service = FunnelwebService()
# service.id = "funnelwebservice"
# sm.registerUtility(service,
# lovely.remotetask.interfaces.ITaskService,
# name="FunnelwebService")
# portal['funnelwebservice'] = service
class ISiteInstalledEvent(zope.component.interfaces.IObjectEvent):
"""Event, raised after Funnelweb installation"""
class SiteInstalledEvent(zope.component.interfaces.ObjectEvent):
zope.interface.implements(ISiteInstalledEvent)
@adapter(None, ISiteInstalledEvent)
def install_service(portal, event):
pass
def __getitem__(self, key):
key = self.aliases.get(key, key)
context = self.context
request = self.request
for name in self.macro_pages:
page = zope.component.getMultiAdapter(
(context, request), name=name)
try:
v = page[key]
except KeyError:
pass
else:
return v
raise KeyError(key)
def eval_expression(self, expression, bundle_name):
if not expression:
return True
cache = component.queryUtility(ram.IRAMCache)
cooked_expression = None
if cache is not None:
cooked_expression = cache.query(
'plone.bundles.cooked_expressions',
key=dict(prefix=bundle_name),
default=None
)
if (
cooked_expression is None or
cooked_expression.text != expression
):
cooked_expression = Expression(expression)
if cache is not None:
cache.set(
cooked_expression,
'plone.bundles.cooked_expressions',
# relation.memberEnd = tail_end
# head_end.type = head_type
# tail_end.type = tail_type
# head_type.ownedAttribute = tail_end
# tail_type.ownedAttribute = head_end
# Now the subject
# association.subject = relation
# association.head_end.subject = head_end
# association.tail_end.subject = tail_end
# Create the diagram item:
association = self.diagram.create(items.AssociationItem)
adapter = component.queryMultiAdapter((head_type_item, association), IConnect)
assert adapter
handle = association.handles()[0]
adapter.connect(handle)
adapter = component.queryMultiAdapter((tail_type_item, association), IConnect)
assert adapter
handle = association.handles()[-1]
adapter.connect(handle)
# Apply attribute information to the association (ends)
association.head_end.navigability = False
tail_prop = association.tail_end.subject
tail_prop.name = attr.attrname
tail_prop.visibility = self._visibility(attr.attrname)
tail_prop.aggregation = 'composite'
else:
@component.adapter(IElementEvent)
def on_element_event(self, event):
element = event.element
# Cache this?
for c in type(element).__mro__:
# Find registered items for this element
s = self._handlers.get(c)
if not s:
continue
for handler in s:
try:
handler(event)
except Exception, e:
log.error('problem executing handler %s' % handler, e)
def manage_page_header(self, *args, **kw):
"""manage_page_header."""
kw['css_urls'] = itertools.chain(
itertools.chain(*zope.component.subscribers((self,), ICSSPaths)),
self._get_zmi_additionals('zmi_additional_css_paths'))
kw['js_urls'] = itertools.chain(
itertools.chain(*zope.component.subscribers((self,), IJSPaths)),
self._get_zmi_additionals('zmi_additional_js_paths'))
return self._manage_page_header(*args, **kw)
def loadTabsForCurrentItem(self, item):
"""Load all tabs that can operate on the given item."""
for name, adapter in component.getAdapters(
[item,], IDetailsPage):
self.prepend_page(adapter, gtk.Label(name))
self.show_all()
@component.adapter(IAttachmentStoragable)
def AttachmentStorageAdapterFactory(content):
""" Adapter factory to fetch the attachment storage from annotations.
"""
annotations = IAnnotations(content)
if ANNOTATION_KEY not in annotations:
attachments = AttachmentStorage()
attachments.__parent__ = aq_base(content)
annotations[ANNOTATION_KEY] = attachments
else:
attachments = annotations[ANNOTATION_KEY]
if isinstance(content, ExtensionClass.Base):
return attachments.__of__(content)
else:
return attachments
def get_certnames(config, verb, allow_multiple=False, custom_prompt=None):
"""Get certname from flag, interactively, or error out.
"""
certname = config.certname
if certname:
certnames = [certname]
else:
disp = zope.component.getUtility(interfaces.IDisplay)
filenames = storage.renewal_conf_files(config)
choices = [storage.lineagename_for_filename(name) for name in filenames]
if not choices:
raise errors.Error("No existing certificates found.")
if allow_multiple:
if not custom_prompt:
prompt = "Which certificate(s) would you like to {0}?".format(verb)
else:
prompt = custom_prompt
code, certnames = disp.checklist(
prompt, choices, cli_flag="--cert-name", force_interactive=True)
if code != display_util.OK:
raise errors.Error("User ended interaction.")
else:
if not custom_prompt:
prompt = "Which certificate would you like to {0}?".format(verb)