Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_too_many_columns(self):
self.mock_file.write(u"1 2 3 4 5 6 7 8 9")
self.mock_file.seek(0)
with self.assertRaises(file_interface.FileInterfaceException):
file_interface.read_tum_trajectory_file(self.mock_file)
def load_transform_json(json_path):
"""
load a transformation stored in xyz + quaternion format in a .json file
:param json_path: path to the .json file
:return: t (SE(3) matrix)
"""
with open(json_path, 'r') as tf_file:
data = json.load(tf_file)
keys = ("x", "y", "z", "qx", "qy", "qz", "qw")
if not all(key in data for key in keys):
raise FileInterfaceException(
"invalid transform file - expected keys " + str(keys))
xyz = np.array([data["x"], data["y"], data["z"]])
quat = np.array([data["qw"], data["qx"], data["qy"], data["qz"]])
t = lie.se3(lie.so3_from_se3(tr.quaternion_matrix(quat)), xyz)
return t
def read_kitti_poses_file(file_path):
"""
parses pose file in KITTI format (first 3 rows of SE(3) matrix per line)
:param file_path: the trajectory file path (or file handle)
:return: trajectory.PosePath3D
"""
raw_mat = csv_read_matrix(file_path, delim=" ", comment_str="#")
error_msg = ("KITTI pose files must have 12 entries per row "
"and no trailing delimiter at the end of the rows (space)")
if len(raw_mat) > 0 and len(raw_mat[0]) != 12:
raise FileInterfaceException(error_msg)
try:
mat = np.array(raw_mat).astype(float)
except ValueError:
raise FileInterfaceException(error_msg)
# yapf: disable
poses = [np.array([[r[0], r[1], r[2], r[3]],
[r[4], r[5], r[6], r[7]],
[r[8], r[9], r[10], r[11]],
[0, 0, 0, 1]]) for r in mat]
# yapf: enable
if not hasattr(file_path, 'read'): # if not file handle
logger.debug("Loaded {} poses from: {}".format(len(poses), file_path))
return PosePath3D(poses_se3=poses)
def csv_read_matrix(file_path, delim=',', comment_str="#"):
"""
directly parse a csv-like file into a matrix
:param file_path: path of csv file (or file handle)
:param delim: delimiter character
:param comment_str: string indicating a comment line to ignore
:return: 2D list with raw data (string)
"""
if hasattr(file_path, 'read'): # if file handle
generator = (line for line in file_path
if not line.startswith(comment_str))
reader = csv.reader(generator, delimiter=delim)
mat = [row for row in reader]
else:
if not os.path.isfile(file_path):
raise FileInterfaceException("csv file " + str(file_path) +
" does not exist")
skip_3_bytes = has_utf8_bom(file_path)
with open(file_path) as f:
if skip_3_bytes:
f.seek(3)
generator = (line for line in f
if not line.startswith(comment_str))
reader = csv.reader(generator, delimiter=delim)
mat = [row for row in reader]
return mat
def read_tum_trajectory_file(file_path):
"""
parses trajectory file in TUM format (timestamp tx ty tz qx qy qz qw)
:param file_path: the trajectory file path (or file handle)
:return: trajectory.PoseTrajectory3D object
"""
raw_mat = csv_read_matrix(file_path, delim=" ", comment_str="#")
error_msg = ("TUM trajectory files must have 8 entries per row "
"and no trailing delimiter at the end of the rows (space)")
if len(raw_mat) > 0 and len(raw_mat[0]) != 8:
raise FileInterfaceException(error_msg)
try:
mat = np.array(raw_mat).astype(float)
except ValueError:
raise FileInterfaceException(error_msg)
stamps = mat[:, 0] # n x 1
xyz = mat[:, 1:4] # n x 3
quat = mat[:, 4:] # n x 4
quat = np.roll(quat, 1, axis=1) # shift 1 column -> w in front column
if not hasattr(file_path, 'read'): # if not file handle
logger.debug("Loaded {} stamps and poses from: {}".format(
len(stamps), file_path))
return PoseTrajectory3D(xyz, quat, stamps)
def read_bag_trajectory(bag_handle, topic):
"""
:param bag_handle: opened bag handle, from rosbag.Bag(...)
:param topic: trajectory topic of supported message type,
or a TF trajectory ID (e.g.: '/tf:map.base_link' )
:return: trajectory.PoseTrajectory3D
"""
from evo.tools import tf_cache
# Use TfCache instead if it's a TF transform ID.
if tf_cache.instance().check_id(topic):
return tf_cache.instance().get_trajectory(bag_handle, identifier=topic)
if not bag_handle.get_message_count(topic) > 0:
raise FileInterfaceException("no messages for topic '" + topic +
"' in bag")
msg_type = bag_handle.get_type_and_topic_info().topics[topic].msg_type
if msg_type not in SUPPORTED_ROS_MSGS:
raise FileInterfaceException(
"unsupported message type: {}".format(msg_type))
# Choose appropriate message conversion.
if msg_type == "geometry_msgs/TransformStamped":
get_xyz_quat = _get_xyz_quat_from_transform_stamped
else:
get_xyz_quat = _get_xyz_quat_from_pose_or_odometry_msg
stamps, xyz, quat = [], [], []
for topic, msg, _ in bag_handle.read_messages(topic):
# Use the header timestamps (converted to seconds).
t = msg.header.stamp
ref_traj = file_interface.read_kitti_poses_file(args.ref)
elif args.subcommand == "euroc":
for csv_file in args.state_gt_csv:
if csv_file == args.ref:
continue
else:
trajectories[
csv_file] = file_interface.read_euroc_csv_trajectory(
csv_file)
if args.ref:
ref_traj = file_interface.read_euroc_csv_trajectory(args.ref)
elif args.subcommand == "bag":
if not (args.topics or args.all_topics):
die("No topics used - specify topics or set --all_topics.")
if not os.path.exists(args.bag):
raise file_interface.FileInterfaceException(
"File doesn't exist: {}".format(args.bag))
import rosbag
logger.debug("Opening bag file " + args.bag)
bag = rosbag.Bag(args.bag)
try:
if args.all_topics:
topics = args.topics + file_interface.get_supported_topics(bag)
if args.ref in topics:
topics.remove(args.ref)
if len(topics) == 0:
die("No topics of supported types: {}".format(
" ".join(file_interface.SUPPORTED_ROS_MSGS)))
else:
topics = args.topics
for topic in topics:
if topic == args.ref:
:param topic: trajectory topic of supported message type,
or a TF trajectory ID (e.g.: '/tf:map.base_link' )
:return: trajectory.PoseTrajectory3D
"""
from evo.tools import tf_cache
# Use TfCache instead if it's a TF transform ID.
if tf_cache.instance().check_id(topic):
return tf_cache.instance().get_trajectory(bag_handle, identifier=topic)
if not bag_handle.get_message_count(topic) > 0:
raise FileInterfaceException("no messages for topic '" + topic +
"' in bag")
msg_type = bag_handle.get_type_and_topic_info().topics[topic].msg_type
if msg_type not in SUPPORTED_ROS_MSGS:
raise FileInterfaceException(
"unsupported message type: {}".format(msg_type))
# Choose appropriate message conversion.
if msg_type == "geometry_msgs/TransformStamped":
get_xyz_quat = _get_xyz_quat_from_transform_stamped
else:
get_xyz_quat = _get_xyz_quat_from_pose_or_odometry_msg
stamps, xyz, quat = [], [], []
for topic, msg, _ in bag_handle.read_messages(topic):
# Use the header timestamps (converted to seconds).
t = msg.header.stamp
stamps.append(t.secs + (t.nsecs * 1e-9))
xyz_t, quat_t = get_xyz_quat(msg)
xyz.append(xyz_t)
quat.append(quat_t)