Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_push_pyrsistent(benchmark):
benchmark(push, pyrsistent.pvector())
def run_len():
# This is quite close to the python function call overhead baseline since the function
# itself hardly does anything. That is the only reason this test is interesting.
v = pvector()
r = 1000000
before = time.time()
for _ in range(r):
len(v)
len_duration = time.time() - before
print("Len: %s s, per call %s s" % (len_duration, len_duration / r))
before = time.time()
for _ in range(r):
pass
empty_duration = time.time() - before
print("Empty loop: %s s, per call %s s" % (empty_duration, empty_duration / r))
print("Len estimate: %s, per call: %s" % (len_duration - empty_duration, (len_duration - empty_duration) / r))
before = time.time()
double = [2, 3]
for _ in iterations:
vec = pvector(double)
print("Many small Double elements: " + str(time.time() - before))
before = time.time()
ten = range(10)
for _ in iterations:
vec = pvector(ten)
print("Many small Ten elements: " + str(time.time() - before))
before = time.time()
x = range(32)
for _ in iterations:
vec = pvector(x)
print("Many small 32 elements: " + str(time.time() - before))
before = time.time()
x = range(33)
for _ in iterations:
vec = pvector(x)
print("Many small 33 elements: " + str(time.time() - before))
def run_big_iterator_initialization():
"""
The results are comparable to those of doing it with a list since most of the
code is shared.
"""
before = time.time()
iterator = (x for x in range(1000000))
print("Big iterator: " + str(time.time() - before))
before = time.time()
seq = pvector(iterator)
print("Big vector from iterator: " + str(time.time() - before))
random_access(l)
print("Random access large list: " + str(time.time() - before))
v = pvector(l)
before = time.time()
random_access(v)
print("Random access large vector: " + str(time.time() - before))
testdata = [0, 4, 17, -2, 3, 7, 8, 11, 1, 13, 18, 10]
l = range(20)
before = time.time()
random_access(l)
print("Random access small list: " + str(time.time() - before))
v = pvector(l)
before = time.time()
random_access(v)
print("Random access small vector: " + str(time.time() - before))
the server is on the ServiceNet network
:ivar str image_id: The ID of the image the server was launched with
:ivar str flavor_id: The ID of the flavor the server was launched with
:ivar PSet desired_lbs: An immutable mapping of load balancer IDs to lists
of :class:`CLBDescription` instances.
:var dict json: JSON dict received from Nova from which this server
is created
"""
id = attr.ib()
state = attr.ib(validator=_validate_state)
created = attr.ib()
image_id = attr.ib()
flavor_id = attr.ib()
# type(pvector()) is pvectorc.PVector, which != pyrsistent.PVector
links = attr.ib(default=attr.Factory(pvector),
validator=instance_of(type(pvector())))
desired_lbs = attr.ib(default=attr.Factory(pset),
validator=instance_of(PSet))
servicenet_address = attr.ib(default='',
validator=instance_of(string_types))
json = attr.ib(default=attr.Factory(pmap), validator=instance_of(PMap))
@classmethod
def from_server_details_json(cls, server_json):
"""
Create a :obj:`NovaServer` instance from a server details JSON
dictionary, although without any 'server' or 'servers' initial resource
key.
See
http://docs.rackspace.com/servers/api/v2/cs-devguide/content/
Get_Server_Details-d1e2623.html
``subobj_b`` assuming that these subobjs are at ``current_path`` inside a
nested pyrsistent object.
:param current_path: An iterable of pyrsistent object describing the path
inside the root pyrsistent object where the other arguments are
located. See ``PMap.transform`` for the format of this sort of path.
:param subobj_a: The desired input sub object.
:param subobj_b: The desired output sub object.
:returns: An iterable of ``_IDiffChange`` s that will turn ``subobj_a``
into ``subobj_b``.
"""
if subobj_a == subobj_b:
return pvector([])
elif isinstance(subobj_a, PClass) and isinstance(subobj_b, PClass):
a_dict = subobj_a._to_dict()
b_dict = subobj_b._to_dict()
return _create_diffs_for_mappings(current_path, a_dict, b_dict)
elif isinstance(subobj_a, PMap) and isinstance(subobj_b, PMap):
return _create_diffs_for_mappings(
current_path, subobj_a, subobj_b)
elif isinstance(subobj_a, PSet) and isinstance(subobj_b, PSet):
return _create_diffs_for_sets(
current_path, subobj_a, subobj_b)
# If the objects are not equal, and there is no intelligent way to recurse
# inside the objects to make a smaller diff, simply set the current path
# to the object in b.
if len(current_path) > 0:
return pvector([
_Set(
:return: ``IStateChange`` provider that will run given changes in
parallel, or ``NoOp`` instance if changes are empty or all
``NoOp``. In former case sleep will be ``sleep_when_empty``, in
latter the minimum sleep of the ``NoOp`` instances.
"""
if all(isinstance(c, NoOp) for c in changes):
sleep = (min(c.sleep for c in changes) if changes
else sleep_when_empty)
return NoOp(sleep=sleep)
return _InParallel(changes=changes)
@implementer(IStateChange)
class _Sequentially(PClass):
changes = field(type=PVector, factory=pvector, mandatory=True)
@property
def eliot_action(self):
return LOG_SEQUENTIALLY()
def run(self, deployer):
d = DeferredContext(succeed(None))
for subchange in self.changes:
d.addCallback(
lambda _, sub=subchange: run_state_change(
sub, deployer
)
)
return d.result
python_types=(datetime,),
factory=_parse_iso8601,
serializer=_isoformat,
),
(u"string", u"int-or-string"): _BasicTypeModel(
python_types=(unicode, int, long),
),
# This is not part of Swagger. It's something we can inject into
# specifications to set a constant value on the resulting types.
(u"x-txkube-constant", u"string"): _ConstantModel(python_types=(unicode,)),
}
name = field(type=unicode)
doc = field(type=unicode)
attributes = field(type=PVector, factory=pvector)
@classmethod
def _type_model_for_spec(cls, pclass_for_definition, spec):
if spec.get(u"type") == u"array":
# "array" type definitions represent an array of some other thing.
# Get a model for whatever the nested thing is and put it into an
# array model.
element_type = cls._type_model_for_spec(
pclass_for_definition, spec[u"items"],
)
return _ArrayTypeModel(element_type=element_type)
if spec.get(u"type") == u"object":
# "object" type definitions represent a mapping from unicode to
# some other thing. Get a model for whatever the values are
# supposed to be and put that into a mapping model.
unpacked_data = unpackb(obj.data,
use_list=False,
encoding='utf-8')
return pbag(decode(item) for item in unpacked_data)
if obj.code == TYPE_FUNC:
return decode_func(obj.data)
module_name, class_name, *data = unpackb(obj.data,
use_list=False,
encoding='utf-8')
cls = getattr(sys.modules[module_name],
class_name)
if obj.code == TYPE_MBOX:
return cls.decode(data)
return cls(*(decode(item) for item in data))
if isinstance(obj, tuple):
return pvector(decode(item) for item in obj)
if isinstance(obj, dict):
new_dict = dict()
for key in obj.keys():
new_dict[decode(key)] = decode(obj[key])
return pmap(new_dict)
return obj