Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
args, config_override=config_override)
logger.debug(u' '.join(command_args))
env = os.environ.copy()
if self.taskrc_location:
env['TASKRC'] = self.taskrc_location
p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env)
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
if p.returncode and allow_failure:
if stderr.strip():
error_msg = stderr.strip()
else:
error_msg = stdout.strip()
error_msg += u'\nCommand used: ' + u' '.join(command_args)
raise TaskWarriorException(error_msg)
# Return all whole triplet only if explicitly asked for
if not return_all:
return stdout.rstrip().split('\n')
else:
return (stdout.rstrip().split('\n'),
stderr.rstrip().split('\n'),
p.returncode)
"""Save a task into TaskWarrior database using add/modify call"""
args = [task['uuid'], 'modify'] if task.saved else ['add']
args.extend(self._get_modified_task_fields_as_args(task))
output = self.execute_command(args)
# Parse out the new ID, if the task is being added for the first time
if not task.saved:
id_lines = [l for l in output if l.startswith('Created task ')]
# Complain loudly if it seems that more tasks were created
# Should not happen.
# Expected output: Created task 1.
# Created task 1 (recurrence template).
if len(id_lines) != 1 or len(id_lines[0].split(' ')) not in (3, 5):
raise TaskWarriorException(
'Unexpected output when creating '
'task: %s' % '\n'.join(id_lines),
)
# Circumvent the ID storage, since ID is considered read-only
identifier = id_lines[0].split(' ')[2].rstrip('.')
# Identifier can be either ID or UUID for completed tasks
try:
task._data['id'] = int(identifier)
except ValueError:
task._data['uuid'] = identifier
# Refreshing is very important here, as not only modification time
# is updated, but arbitrary attribute may have changed due hooks
# altering the data before saving
# Make a copy, removing ID and UUID. It's most likely invalid
# (ID 0) if it failed to match a unique task.
data = copy.deepcopy(task._data)
data.pop('id', None)
data.pop('uuid', None)
taskfilter = self.filter_class(self)
for key, value in data.items():
taskfilter.add_filter_param(key, value)
output = self.execute_command(['export'] +
taskfilter.get_filter_params())
# If more than 1 task has been matched still, raise an exception
if not valid(output):
raise TaskWarriorException(
'Unique identifiers {0} with description: {1} matches '
'multiple tasks: {2}'.format(
task['uuid'] or task['id'], task['description'], output)
)
return json.loads(output[0])
def filter_tasks(self, filter_obj):
self.enforce_recurrence()
args = ['export'] + filter_obj.get_filter_params()
tasks = []
for line in self.execute_command(args):
if line:
data = line.strip(',')
try:
filtered_task = Task(self)
filtered_task._load_data(json.loads(data))
tasks.append(filtered_task)
except ValueError:
raise TaskWarriorException('Invalid JSON: %s' % data)
return tasks