Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def fake_trajectory(length, timestamp_distance, start_time=0.):
return PoseTrajectory3D(
poses_se3=random_se3_list(length), timestamps=fake_timestamps(
length, timestamp_distance, start_time))
def test_alignment_degenerate_case(self):
length = 100
poses = [lie.random_se3()] * length
traj_1 = PoseTrajectory3D(
poses_se3=poses,
timestamps=helpers.fake_timestamps(length, 1, 0.0))
traj_2 = copy.deepcopy(traj_1)
traj_2.transform(lie.random_se3())
traj_2.scale(1.234)
self.assertNotEqual(traj_1, traj_2)
with self.assertRaises(GeometryException):
trajectory.align_trajectory(traj_1, traj_2)
with self.assertRaises(GeometryException):
trajectory.align_trajectory(traj_1, traj_2, correct_scale=True)
def merge(trajectories):
"""
Merges multiple trajectories into a single, timestamp-sorted one.
:param trajectories: list of PoseTrajectory3D objects
:return: merged PoseTrajectory3D
"""
merged_stamps = np.concatenate([t.timestamps for t in trajectories])
merged_xyz = np.concatenate([t.positions_xyz for t in trajectories])
merged_quat = np.concatenate(
[t.orientations_quat_wxyz for t in trajectories])
order = merged_stamps.argsort()
merged_stamps = merged_stamps[order]
merged_xyz = merged_xyz[order]
merged_quat = merged_quat[order]
return PoseTrajectory3D(merged_xyz, merged_quat, merged_stamps)
def get_statistics(self):
"""
:return: dictionary with some statistics of the trajectory
"""
stats = super(PoseTrajectory3D, self).get_statistics()
speeds = [
calc_speed(self.positions_xyz[i], self.positions_xyz[i + 1],
self.timestamps[i], self.timestamps[i + 1])
for i in range(len(self.positions_xyz) - 1)
]
vmax = max(speeds)
vmin = min(speeds)
vmean = np.mean(speeds)
stats.update({
"v_max (m/s)": vmax,
"v_min (m/s)": vmin,
"v_avg (m/s)": vmean,
"v_max (km/h)": vmax * 3.6,
"v_min (km/h)": vmin * 3.6,
"v_avg (km/h)": vmean * 3.6
})
get_xyz_quat = _get_xyz_quat_from_pose_or_odometry_msg
stamps, xyz, quat = [], [], []
for topic, msg, _ in bag_handle.read_messages(topic):
# Use the header timestamps (converted to seconds).
t = msg.header.stamp
stamps.append(t.secs + (t.nsecs * 1e-9))
xyz_t, quat_t = get_xyz_quat(msg)
xyz.append(xyz_t)
quat.append(quat_t)
logger.debug("Loaded {} {} messages of topic: {}".format(
len(stamps), msg_type, topic))
generator = bag_handle.read_messages(topic)
_, first_msg, _ = next(generator)
frame_id = first_msg.header.frame_id
return PoseTrajectory3D(xyz, quat, stamps, meta={"frame_id": frame_id})
def write_tum_trajectory_file(file_path, traj, confirm_overwrite=False):
"""
:param file_path: desired text file for trajectory (string or handle)
:param traj: trajectory.PoseTrajectory3D
:param confirm_overwrite: whether to require user interaction
to overwrite existing files
"""
if isinstance(file_path, str) and confirm_overwrite:
if not user.check_and_confirm_overwrite(file_path):
return
if not isinstance(traj, PoseTrajectory3D):
raise FileInterfaceException(
"trajectory must be a PoseTrajectory3D object")
stamps = traj.timestamps
xyz = traj.positions_xyz
# shift -1 column -> w in back column
quat = np.roll(traj.orientations_quat_wxyz, -1, axis=1)
mat = np.column_stack((stamps, xyz, quat))
np.savetxt(file_path, mat, delimiter=" ")
if isinstance(file_path, str):
logger.info("Trajectory saved to: " + file_path)
def associate_trajectories(traj_1, traj_2, max_diff=0.01, offset_2=0.0,
first_name="first trajectory",
snd_name="second trajectory"):
"""
Synchronizes two trajectories by matching their timestamps.
:param traj_1: trajectory.PoseTrajectory3D object of first trajectory
:param traj_2: trajectory.PoseTrajectory3D object of second trajectory
:param max_diff: max. allowed absolute time difference for associating
:param offset_2: optional time offset of second trajectory
:param first_name: name of first trajectory for verbose logging
:param snd_name: name of second trajectory for verbose/debug logging
:return: traj_1, traj_2 (synchronized)
"""
if not isinstance(traj_1, PoseTrajectory3D) \
or not isinstance(traj_2, PoseTrajectory3D):
raise SyncException("trajectories must be PoseTrajectory3D objects")
snd_longer = len(traj_2.timestamps) > len(traj_1.timestamps)
traj_long = copy.deepcopy(traj_2) if snd_longer else copy.deepcopy(traj_1)
traj_short = copy.deepcopy(traj_1) if snd_longer else copy.deepcopy(traj_2)
max_pairs = len(traj_short.timestamps)
# First, match the timestamps of the shorter trajectory to the longer one.
matching_indices = matching_time_indices(
traj_short.timestamps, traj_long.timestamps, max_diff,
offset_2 if snd_longer else -offset_2)
traj_long.reduce_to_ids(matching_indices)
# Next, reversely match the reduced long trajectory to the shorter one.
matching_indices = matching_time_indices(
traj_long.timestamps, traj_short.timestamps, max_diff,
plot_collection = plot.PlotCollection("evo_traj - trajectory plot")
fig_xyz, axarr_xyz = plt.subplots(3, sharex="col", figsize=tuple(
SETTINGS.plot_figsize))
fig_rpy, axarr_rpy = plt.subplots(3, sharex="col", figsize=tuple(
SETTINGS.plot_figsize))
fig_traj = plt.figure(figsize=tuple(SETTINGS.plot_figsize))
plot_mode = plot.PlotMode[args.plot_mode]
ax_traj = plot.prepare_axis(fig_traj, plot_mode)
# for x-axis alignment starting from 0 with --plot_relative_time
start_time = None
if args.ref:
if isinstance(ref_traj, trajectory.PoseTrajectory3D) \
and args.plot_relative_time:
start_time = ref_traj.timestamps[0]
short_traj_name = os.path.splitext(os.path.basename(args.ref))[0]
if SETTINGS.plot_usetex:
short_traj_name = short_traj_name.replace("_", "\\_")
plot.traj(ax_traj, plot_mode, ref_traj,
style=SETTINGS.plot_reference_linestyle,
color=SETTINGS.plot_reference_color,
label=short_traj_name,
alpha=SETTINGS.plot_reference_alpha)
plot.draw_coordinate_axes(ax_traj, ref_traj, plot_mode,
SETTINGS.plot_axis_marker_scale)
plot.traj_xyz(
axarr_xyz, ref_traj, style=SETTINGS.plot_reference_linestyle,
color=SETTINGS.plot_reference_color, label=short_traj_name,
elif only_scale:
title += "\n(scale corrected)"
elif align_origin:
title += "\n(with origin alignment)"
else:
title += "\n(not aligned)"
ape_result = ape_metric.get_result(ref_name, est_name)
ape_result.info["title"] = title
logger.debug(SEP)
logger.info(ape_result.pretty_str())
ape_result.add_trajectory(ref_name, traj_ref)
ape_result.add_trajectory(est_name, traj_est)
if isinstance(traj_est, trajectory.PoseTrajectory3D):
seconds_from_start = [
t - traj_est.timestamps[0] for t in traj_est.timestamps
]
ape_result.add_np_array("seconds_from_start", seconds_from_start)
ape_result.add_np_array("timestamps", traj_est.timestamps)
return ape_result
error_msg = ("TUM trajectory files must have 8 entries per row "
"and no trailing delimiter at the end of the rows (space)")
if len(raw_mat) > 0 and len(raw_mat[0]) != 8:
raise FileInterfaceException(error_msg)
try:
mat = np.array(raw_mat).astype(float)
except ValueError:
raise FileInterfaceException(error_msg)
stamps = mat[:, 0] # n x 1
xyz = mat[:, 1:4] # n x 3
quat = mat[:, 4:] # n x 4
quat = np.roll(quat, 1, axis=1) # shift 1 column -> w in front column
if not hasattr(file_path, 'read'): # if not file handle
logger.debug("Loaded {} stamps and poses from: {}".format(
len(stamps), file_path))
return PoseTrajectory3D(xyz, quat, stamps)