Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Once the job reaches `TERMINATED` state, the return code
(stored also in `.returncode`) is returned; if the job is not
yet in `TERMINATED` state, calling `progress` returns `None`.
:raises: exception :class:`UnexpectedStateError` if the
associated job goes into state `STOPPED` or `UNKNOWN`
:return: final returncode, or `None` if the execution
state is not `TERMINATED`.
"""
# first update state, we'll submit NEW jobs last, so that the
# state is not updated immediately after submission as ARC
# does not cope well with this...
if self.execution.state in [Run.State.SUBMITTED,
Run.State.RUNNING,
Run.State.STOPPED,
Run.State.UNKNOWN]:
self.update_state()
# now "do the right thing" based on actual state
if self.execution.state in [Run.State.STOPPED,
Run.State.UNKNOWN]:
raise gc3libs.exceptions.UnexpectedStateError(
"Task '%s' entered `%s` state." % (self, self.execution.state))
elif self.execution.state == Run.State.NEW:
self.submit()
elif self.execution.state == Run.State.TERMINATING:
self.fetch_output()
return self.execution.returncode
def fset(self, value):
assert value in Run.State, \
("Value '%s' is not a legal `gc3libs.Run.State` value." % value)
if self._state != value:
self.state_last_changed = time.time()
self.timestamp[value] = time.time()
self.history.append(value)
if self._ref is not None:
# mark as changed
self._ref.changed = True
# invoke state-transition method
handler = value.lower()
gc3libs.log.debug(
"Calling state-transition handler '%s' on %s ..."
% (handler, self._ref))
getattr(self._ref, handler)()
self._state = value
return locals()
def __init__(self, **extra_args):
"""
Initialize a `Task` instance.
The following attributes are defined on a valid `Task` instance:
* `execution`: a `gc3libs.Run`:class: instance
:param grid: A :class:`gc3libs.Engine` or
:class:`gc3libs.Core` instance, or anything
implementing the same interface.
"""
Persistable.__init__(self, **extra_args)
Struct.__init__(self, **extra_args)
self.execution = Run(attach=self)
# `_controller` and `_attached` are set by `attach()`/`detach()`
self._attached = False
self._controller = None
self.changed = True
gc3libs.quantity.Memory)), \
("Expected `Memory` instance for `requested_memory,"
" got %r %s instead."
% (self.requested_memory, type(self.requested_memory)))
self.requested_walltime = extra_args.pop('requested_walltime', None)
assert (self.requested_walltime is None
or isinstance(self.requested_walltime,
gc3libs.quantity.Duration)), \
("Expected `Duration` instance for `requested_walltime,"
" got %r %s instead."
% (self.requested_memory, type(self.requested_memory)))
self.requested_architecture = extra_args.pop(
'requested_architecture', None)
if self.requested_architecture is not None \
and self.requested_architecture not in [
Run.Arch.X86_32,
Run.Arch.X86_64]:
raise gc3libs.exceptions.InvalidArgument(
"Architecture must be either '%s' or '%s'"
% (Run.Arch.X86_32, Run.Arch.X86_64))
self.environment = dict(
(str(k), str(v)) for k, v in extra_args.pop(
'environment', dict()).items())
self.join = extra_args.pop('join', False)
self.stdin = extra_args.pop('stdin', None)
if self.stdin and (self.stdin not in self.inputs):
self.inputs[self.stdin] = os.path.basename(self.stdin)
self.stdout = extra_args.pop('stdout', None)
if self.stdout is not None and os.path.isabs(self.stdout):
raise gc3libs.exceptions.InvalidArgument(
self.output_base_url = extra_args.pop('output_base_url', None)
self.requested_cores = int(extra_args.pop('requested_cores', 1))
self.requested_memory = extra_args.pop('requested_memory', None)
assert (self.requested_memory is None
or isinstance(self.requested_memory, gc3libs.quantity.Memory)), \
("Expected `Memory` instance for `requested_memory, got %r %s instead."
% (self.requested_memory, type(self.requested_memory)))
self.requested_walltime = extra_args.pop('requested_walltime', None)
assert (self.requested_walltime is None
or isinstance(self.requested_walltime, gc3libs.quantity.Duration)), \
("Expected `Duration` instance for `requested_walltime, got %r %s instead."
% (self.requested_memory, type(self.requested_memory)))
self.requested_architecture = extra_args.pop('requested_architecture', None)
if self.requested_architecture is not None \
and self.requested_architecture not in [ Run.Arch.X86_32, Run.Arch.X86_64 ]:
raise gc3libs.exceptions.InvalidArgument(
"Architecture must be either '%s' or '%s'"
% (Run.Arch.X86_32, Run.Arch.X86_64))
self.environment = extra_args.pop('environment', dict())
self.environment = dict(Application._to_env_pair(x)
for x in self.environment.items())
self.join = extra_args.pop('join', False)
self.stdin = extra_args.pop('stdin', None)
if self.stdin and (self.stdin not in self.inputs):
self.inputs[self.stdin] = os.path.basename(self.stdin)
self.stdout = extra_args.pop('stdout', None)
if self.stdout is not None and os.path.isabs(self.stdout):
raise InvalidArgument(
"Absolute path '%s' passed as `Application.stdout`"
def __init__(self, **extra_args):
"""
Initialize a `Task` instance.
The following attributes are defined on a valid `Task` instance:
* `execution`: a `gc3libs.Run`:class: instance
:param grid: A :class:`gc3libs.Engine` or
:class:`gc3libs.Core` instance, or anything
implementing the same interface.
"""
Persistable.__init__(self, **extra_args)
Struct.__init__(self, **extra_args)
self.execution = Run(attach=self)
# `_controller` and `_attached` are set by `attach()`/`detach()`
self._attached = False
self._controller = None
self.changed = True