Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _getPropertiesForVariants(self, cart_item):
"""
"""
u = getUtility(INumberConverter)
cm = ICurrencyManagement(self.context)
variant = cart_item.getProduct()
product = variant.aq_inner.aq_parent
selected_options = {}
for property in variant.getForProperties():
name, value = property.split(":")
selected_options[name] = value
pm = IPropertyManagement(product)
result = []
for property in pm.getProperties():
# Only properties with at least one option are displayed.
def reply(self):
records_to_update = json.loads(self.request.get('BODY', '{}'))
registry = getUtility(IRegistry)
# Disable CSRF protection
if 'IDisableCSRFProtection' in dir(plone.protect.interfaces):
alsoProvides(self.request,
plone.protect.interfaces.IDisableCSRFProtection)
for key, value in records_to_update.items():
if key not in registry:
raise NotImplementedError(
"This endpoint is only intended to update existing "
"records! Couldn't find key %r" % key)
registry[key] = value
self.request.response.setStatus(204)
return None
def _report_failed_challs(failed_achalls):
"""Notifies the user about failed challenges.
:param set failed_achalls: A set of failed
:class:`letsencrypt.achallenges.AnnotatedChallenge`.
"""
problems = dict()
for achall in failed_achalls:
if achall.error:
problems.setdefault(achall.error.typ, []).append(achall)
reporter = zope.component.getUtility(interfaces.IReporter)
for achalls in problems.itervalues():
reporter.add_message(
_generate_failed_chall_msg(achalls), reporter.MEDIUM_PRIORITY)
def configs(self):
registry = getUtility(IRegistry)
# do not fail if the upgrade step has not being run
settings = registry.forInterface(ISocialLikeSettings, check=False)
return settings
def get_users():
home = db.get_root()['oms_root']['home']
auth = getUtility(IAuthentication)
for pname, pobj in auth.principals.iteritems():
if type(pobj) is User:
if pobj.id not in home.listnames():
up = UserProfile(pobj.id, pobj.groups, uid=pobj.uid)
log.msg('Adding %s to /home' % (up))
home.add(up)
else:
if pobj.uid != home[pobj.id].uid:
home[pobj.id].uid = pobj.uid
if pobj.groups != home[pobj.id].groups:
home[pobj.id].groups = pobj.groups
def getConfig():
registry = getUtility(IRegistry)
return registry.forInterface(ISolrSchema, prefix='collective.solr')
def nextURL(self):
view_url = self.context.absolute_url()
portal_type = getattr(self, 'portal_type', None)
if portal_type is not None:
registry = getUtility(IRegistry)
use_view_action = registry.get(
'plone.types_use_view_action_in_listings', [])
if portal_type in use_view_action:
view_url = view_url + '/view'
return view_url
return
uniqueKey = schema.get('uniqueKey', None)
if uniqueKey is None:
msg = 'schema is missing unique key, skipping indexing of %r'
logger.warning(msg, obj)
return
if attributes is not None:
attributes = set(schema.keys()).intersection(attributes)
if not attributes:
return
data, missing = self.getData(obj)
if not data:
return # don't index with no data...
prepareData(data)
if data.get(uniqueKey, None) is not None and not missing:
config = getUtility(ISolrConnectionConfig)
if config.commit_within:
data['commitWithin'] = config.commit_within
try:
logger.debug('indexing %r (%r)', obj, data)
pt = data.get('portal_type', 'default')
logger.debug(
'indexing %r with %r adder (%r)', obj, pt, data)
adder = queryAdapter(obj, ISolrAddHandler, name=pt)
if adder is None:
adder = DefaultAdder(obj)
adder(conn, boost_values=boost_values(obj, data), **data)
except (SolrException, error):
logger.exception('exception during indexing %r', obj)
def decorate(mlist, uri, extradict=None):
"""Expand the decoration template."""
if uri is None:
return ''
# Get the decorator template.
loader = getUtility(ITemplateLoader)
template_uri = expand(uri, dict(
listname=mlist.fqdn_listname,
language=mlist.preferred_language.code,
))
template = loader.get(template_uri)
# Create a dictionary which includes the default set of interpolation
# variables allowed in headers and footers. These will be augmented by
# any key/value pairs in the extradict.
substitutions = dict(
fqdn_listname = mlist.fqdn_listname,
list_name = mlist.list_name,
host_name = mlist.mail_host,
display_name = mlist.display_name,
listinfo_uri = mlist.script_url('listinfo'),
list_requests = mlist.request_address,
description = mlist.description,
for vhost in self.vhosts:
all_names.update(vhost.get_names())
if vhost.modmacro:
vhost_macro.append(vhost.filep)
for addr in vhost.addrs:
if common.hostname_regex.match(addr.get_addr()):
all_names.add(addr.get_addr())
else:
name = self.get_name_from_ip(addr)
if name:
all_names.add(name)
if len(vhost_macro) > 0:
zope.component.getUtility(interfaces.IDisplay).notification(
"Apache mod_macro seems to be in use in file(s):\n{0}"
"\n\nUnfortunately mod_macro is not yet supported".format(
"\n ".join(vhost_macro)))
return all_names