Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Parameters
==========
tag: the text that comes after deid to indicate the tag of the file in deid/data
exit_on_fail: if None is an acceptable return value, this should be set to False
(default is True).
quiet: Default False. If None is acceptable, quiet can be set to True
load: also load the deid, if resulting path (from path or tag) is not None
"""
# no tag/path means load default
if tag is None:
tag = "dicom"
# If it's already loaded
if isinstance(tag, dict):
bot.debug("deid is already loaded.")
return tag
# If it's a path, get full path
if os.path.exists(tag):
deid = os.path.abspath(tag)
else:
deid = "%s/deid.%s" % (data_base, tag)
if not os.path.exists(deid):
if quiet is False:
bot.error("Cannot find %s" % (deid))
if exit_on_fail is True:
sys.exit(1)
else:
return None
for contender in contenders:
if os.path.isdir(contender):
dicom_files = recursive_find(contender, pattern=pattern)
else:
dicom_files = [contender]
for dicom_file in dicom_files:
if dicom_file is not None:
if check:
validated_files = validate_dicoms(dicom_file, force=force)
else:
validated_files = [dicom_file]
for validated_file in validated_files:
bot.debug("Found contender file %s" % (validated_file))
yield validated_file
def _get_clean_name(self, output_folder, extension="dcm"):
"""return a full path to an output file, with custom folder and
extension. If the output folder isn't yet created, make it.
Parameters
==========
output_folder: the output folder to create, will be created if doesn't
exist.
extension: the extension of the file to create a name for, should
not start with "."
"""
if output_folder is None:
output_folder = self.output_folder
if not os.path.exists(output_folder):
bot.debug("Creating output folder %s" % output_folder)
os.mkdir(output_folder)
basename = re.sub("[.]dicom|[.]dcm", "", os.path.basename(self.dicom_file))
return "%s/cleaned-%s.%s" % (output_folder, basename, extension)
# Actions that can optionally have a value
elif action in ["REMOVE"]:
bot.debug("%s: adding %s" % (section, line))
# Case 1: removing without any criteria
if len(parts) == 0:
config[section].append({"action": action, "field": field})
# Case 2: REMOVE can have a func:is_thing to return boolean
else:
value = _remove_comments(parts)
config[section].append({"action": action, "field": field, "value": value})
# Actions that don't require a value
elif action in ["BLANK", "KEEP"]:
bot.debug("%s: adding %s" % (section, line))
config[section].append({"action": action, "field": field})
return config
def jitter_timestamp(field,value,item):
'''if present, jitter a timestamp in dicom
field "field" by number of days specified by "value"
The value can be positive or negative.
'''
value = to_int(value)
original = item.get(field,None)
if original is not None:
jittered = get_timestamp(item_date=original,
jitter_days=value,
format="%Y%m%d")
bot.debug("JITTER %s + (%s): %s" %(original,
value,
jittered))
item[field] = jittered
return item
dicom_files, save=True, overwrite=False, output_folder=None, force=True
):
"""remove_private_identifiers is a wrapper for the
simple call to dicom.remove_private_tags, it simply
reads in the files for the user and saves accordingly
"""
updated_files = []
if not isinstance(dicom_files, list):
dicom_files = [dicom_files]
for dicom_file in dicom_files:
dicom = read_file(dicom_file, force=force)
dicom.remove_private_tags()
dicom_name = os.path.basename(dicom_file)
bot.debug("Removed private identifiers for %s" % dicom_name)
if save:
dicom = save_dicom(
dicom=dicom,
dicom_file=dicom_file,
output_folder=output_folder,
overwrite=overwrite,
)
updated_files.append(dicom)
return updated_files
for tag in taglist:
with tag_in_exception(tag):
if tag in ds:
try:
data_element = ds[tag]
if data_element.tag.is_private:
bot.debug(data_element.name)
private_tags.append(data_element)
if tag in ds and data_element.VR == "SQ":
sequence = data_element.value
for dataset in sequence:
datasets.append(dataset)
except IndexError:
bot.debug("tag %s key present without value" % tag)
except NotImplementedError:
bot.debug("tag %s is invalid, skipping" % tag)
return private_tags
deid: the already loaded deid, with a header section with
actions to specify how to clean
'''
# Keep track of the fields we've seen, not to blank them
seen = []
for action in deid['header']:
item,fields = perform_action(item=item,
action=action,
return_seen=True)
seen = seen + [f for f in fields if f not in seen]
remaining = [x for x in item.keys() if x not in seen]
# Apply default action to remaining fields
if len(remaining) > 0 and default != "KEEP":
bot.debug("%s fields set for default action %s" %(len(remaining),default))
for field in remaining:
action = {'action': default, "field":field}
item = perform_action(item=item, action=action)
return item
if not isinstance(dcm_files, list):
dcm_files = [dcm_files]
valids = []
bot.debug("Checking %s dicom files for validation." % (len(dcm_files)))
for dcm_file in dcm_files:
try:
with open(dcm_file, "rb") as filey:
read_file(filey, force=force)
valids.append(dcm_file)
except:
bot.warning("Cannot read input file {0!s}, skipping.".format(dcm_file))
bot.debug("Found %s valid dicom files" % (len(valids)))
return valids