Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class IDate(IDateClass):
"""Represents a date (year, month and day) in an idealized calendar.
Implemented by `datetime.date`.
Operators:
__repr__, __str__
__cmp__, __hash__
__add__, __radd__, __sub__ (add/radd only with timedelta arg)
"""
year = Attribute("Between MINYEAR and MAXYEAR inclusive.")
month = Attribute("Between 1 and 12 inclusive")
day = Attribute(
"Between 1 and the number of days in the given month of the given year.")
def replace(year, month, day):
"""Return a date with the same value.
Except for those members given new values by whichever keyword
arguments are specified. For example, if ``d == date(2002, 12, 31)``, then
``d.replace(day=26) == date(2000, 12, 26)``.
"""
def timetuple():
"""Return a 9-element tuple of the form returned by `time.localtime`.
The hours, minutes and seconds are 0, and the DST flag is -1.
/ / / /
the ---+ / the ---+ /
drain ----+ drain ----+
=========> direction of flow =========>
(Image credit Nam Nguyen)
"""
inputType = Attribute(
"""
The type expected to be received as input to received.
"""
)
outputType = Attribute(
"""
The type expected to be sent as output to C{tube.deliver}.
"""
)
tube = Attribute(
"""
A reference to a L{Tube}. This will be set externally.
""")
def started():
"""
The flow of items has started. C{received} may be called at any point
after this.
"""
'''Interface implemented by the documents holding persitent state of the
agent'''
shard = Attribute('C{unicode} Shard the agent runs in')
instance_id = Attribute('C{int} counter of agent incarnation')
resources = Attribute('C{dict} name -> IAllocatedResource '
'resources allocated by HA for this agent')
under_restart = Attribute('C{bool} flag saying that the agent is '
'being restarted right now my the monitor')
partners = Attribute('C{list} of IPartner')
class IPartner(Interface):
recipient = Attribute('IRecipient of the agent on the other side')
allocation_id = Attribute('C{int} id of the allocation representing this '
'partnership (or None)')
role = Attribute('C{unicode} optional role identifier')
def initiate(agent):
"""After returning a synchronous result or when the returned fiber
is finished the partner is stored to descriptor."""
def on_shutdown(agent):
pass
def on_goodbye(agent, brothers):
'''
Called when the partner goes through the termination procedure.
@param brothers: The list of the partner of the same class
of the agent.
def generate_new_token(self):
"""
Get a new unique token
"""
def valid(self, token_id):
"""
Is the token for given id still valid
"""
class IToken(Interface):
""" Interface for Token class
"""
id = Attribute(_('The UUID of this token'))
uses = Attribute(_('The number of uses for this token before it expires'))
expiry = Attribute(_('The datetime this token expires'))
from zope.i18nmessageid import MessageFactory
_ = MessageFactory('ploneintranet.activitystream')
class IPloneIntranetActivitystreamLayer(Interface):
"""Marker interface to define ZTK browser layer"""
class IActivityProvider(IContentProvider, IActivity):
"""Helper to render IActivity"""
author_home_url = Attribute("author home url")
user_data = Attribute("author memberinfo dict")
user_portrait = Attribute("author portrait url")
date = Attribute("formatted datetime")
# + all the IActivity accessors
class IActivityReplyProvider(IActivityProvider):
""" Render IActivityReply """
class IStreamProvider(IContentProvider):
"""Helper to render activity streams"""
portlet_data = Attribute(
"Optional slot for IActivitystreamPortlet data access")
tag = Attribute("Optional tag to filter on")
userid = Attribute("Optional userid to filter on")
i.e., ?foo=bar&foo=baz&quux=spam results in
{'foo': ['bar', 'baz'], 'quux': ['spam']}.
@ivar requestHeaders: Header fields received in the request.
@ivar received_headers: DEPRECATED: All received headers.
"""
method = Attribute("The HTTP method that was used.")
uri = Attribute("The full URI that was requested (includes arguments).")
path = Attribute("The path only (arguments not included).")
prepath = Attribute("Path segments that have already been handled.")
postpath = Attribute("Path segments still to be handled.")
args = Attribute("All of the arguments, including URL and POST arguments.")
requestHeaders = Attribute(
"A L{http_headers.Headers} instance giving all received HTTP request "
"header fields.")
deferred = Attribute("Fired once request processing is finished.")
client = Attribute("The client that sent this request.")
content = Attribute("File-like object containing the request body.")
# DEPRECATED:
received_headers = Attribute("DEPRECATED: All received headers.")
# Methods for received request
def getHeader(key):
"""Get a header that was sent from the network.
Return C{None} if the header is not present.
"""
def getCookie(key):
"""Get a cookie that was sent from the network.
"""
from feat.common import error
from zope.interface import Interface, Attribute
class IActivity(Interface):
description = Attribute("C{unicode} Explanation of the activity")
started = Attribute("C{bool} flag saying if the activity is already "
"started")
done = Attribute("C{bool} flag saying if the activity is "
"finished and can be removed")
busy = Attribute("C{bool} flag saying if this activity should count "
"towards not being idle. Nonbusy calls are also "
"cancelled during the terminate() call.")
def cancel():
'''Stops/cancels the activity.'''
def notify_finish():
'''Returns a Deferred which will fire after this activity finished.'''
class IActivityManager(Interface):
"""
Interface implemented by the classes tracking their activity.
"""
class INodeFactory(Interface):
"""
Create nodes
"""
def __call__(id, name, state, public_ip, private_ip, driver):
"""
Set values for ivars, including any other requisite kwargs
"""
class INodeSize(Interface):
"""
A machine image
"""
id = Attribute("""Unique ID provided by the provider (m1.small, etc)""")
name = Attribute("""Name provided by the provider (Small CPU, etc)""")
ram = Attribute("""Amount of RAM provided in MB (256MB, 1740MB)""")
disk = Attribute("""Amount of disk provided in GB (200GB)""")
bandwidth = Attribute("""Amount of total transfer bandwidth in GB""")
price = Attribute("""Hourly price of this server in USD, estimated if
monthly""")
driver = Attribute("""The NodeDriver that belongs to this Image""")
class INodeSizeFactory(Interface):
"""
Create nodes
"""
def __call__(id, name, ram, disk, bandwidth, price, driver):
"""
Set values for ivars, including any other requisite kwargs
"""
def getOrdersForCustomer(customer_id):
"""Returns orders for customer with given id.
"""
class IOrderItem(Interface):
"""Marker interface to mark order item content objects.
"""
class IOrdersContainer(Interface):
"""An marker interface for containers which hold orders.
"""
class IOrderSubmitted(Interface):
"""An event fired when an order has been submitted.
"""
context = Attribute("The order that has been submitted")
class IOrderPayed(Interface):
"""An event fired when an order has been payed.
"""
context = Attribute("The order that has been payed")
class IOrderSent(Interface):
"""An event fired when an order has been sent
"""
context = Attribute("The order that has been sent")
class IOrderClosed(Interface):
"""An event fired when an order has been closed
"""
context = Attribute("The order that has been closed")
from zope import interface
class IModule(interface.Interface):
name = interface.Attribute('Name of this module.')
script_url = interface.Attribute('Plugin script URL.')
def get_environment(self):
"""
Return a dict of items to add to the verb environment.
"""
def handle_message(self, msg):
"""
Handle a message generated by the verb environment.
"""
def get_commands(self):
"""
Return a dict of WorldTransaction/amp.Command classes provided by this module.