-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpostprocess.py
226 lines (189 loc) · 7.63 KB
/
postprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# -*- coding: utf-8 -*-
"""
Post-Processing of the eye camera images and T265 kinematic head
# Create/Load the marker_times.yaml file
# Create/Load the odo_times.yaml file
data
@author: 2023-03-18 - Michael Davis
@author: 2023-03-27 - Michelle Greene going back to argparse
@author: 2023-04-24 - Michelle Greene removing ODO for speed
"""
###################
##### Imports #####
###################
# Public
import argparse
from datetime import datetime
import os
import yaml
import numpy as np
import pupil_recording_interface
# The following are only necessary for odometry post-processing
#from scipy import signal
#import plotly.graph_objects as go
#import pandas as pd
##########################
##### File Arguments #####
##########################
# Read folder from command-line input
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--folder", help = "input the folder location where all your files are located.")
args = parser.parse_args()
# error if no folder
if not args.folder:
raise ValueError('Please indicate session folder YYYY-MM-DD-HH-MM-SS')
else:
input_folder = args.folder
# Inputs
marker_times_yaml = os.path.join(input_folder, "marker_times.yaml")
odo_times_yaml = os.path.join(input_folder, "odo_times.yaml")
world_timestamps = np.load(os.path.join(input_folder, "world_timestamps.npy"))
odometry_data = pupil_recording_interface.load_dataset(
input_folder, odometry="recording", cache=False)
#################################
##### SPECIFY MARKER EPOCHS #####
#################################
# This is ripped from vedb_store.utils so that we don't have to rely on that library
# https://github.com/vedb/vedb-store/blob/main/vedb_store/utils.py
def specify_marker_epochs(timestamps, yaml_file, fps=30):
"""Manually add the markings for the markers.yaml file
TODO: add more info here
"""
# Initialize
ordinals = ["first", "second", "third", "fourth", "fifth", "too many"]
marker_type = ["calibration", "validation"]
markers = {}
# Take user inputs
for mk in marker_type:
markers[mk + "_orig_times"] = []
for count in ordinals:
print(f"\n=== {count.capitalize()} {mk} epoch ===")
minsec_str = input("Please enter start of epoch as `min,sec` : ")
min_start, sec_start = [float(x) for x in minsec_str.split(",")]
minsec_str = input("Please enter end of epoch as `min,sec` : ")
min_end, sec_end = [float(x) for x in minsec_str.split(",")]
markers[f"{mk}_orig_times"].append(
[min_start * 60 + sec_start, min_end * 60 + sec_end]
)
quit = input(f"Enter additional {mk}? (y/n): ")
if quit[0].lower() == "n":
break
mka = np.array(markers[f"{mk}_orig_times"]).astype(int)
markers[f"{mk}_frames"] = [[int(a), int(b)] for (a, b) in mka * fps]
markers[f"{mk}_times"] = [
[int(np.floor(a)), int(np.ceil(b))] for (a, b) in timestamps[mka * fps]
]
# Save results
with open(yaml_file, mode="w", encoding="utf-8") as fid:
yaml.dump(markers, fid)
return markers
#########################
##### Odometry Plot #####
#########################
# def start_end_plot(odometry, marker_times, yaml_file):
# """Use a plot fo find the marker times?
# TODO: add more info here
# """
# # Read in the marker validation times
# all_times = marker_times
# # Convert times
# times = np.array(odometry.time.values)
# time_convert = np.zeros(len(times))
# for i in range(len(times)):
# time_convert[i] = (
# odometry.time.values[i] - odometry.time.values[0]
# ) / 1000000000
# # Plotting accesories
# input_value = all_times["validation_orig_times"][0][1]
# indx = (np.abs(time_convert - input_value)).argmin()
# samps = 200 * 120
# # smooth data for better viewing purposes
# pitch_vel = signal.savgol_filter(odometry.angular_velocity[:, 0], 101, 2)
# yaw_vel = signal.savgol_filter(odometry.angular_velocity[:, 1], 101, 2)
# # Start plotting
# fig = go.Figure()
# fig.add_trace(
# go.Scatter(
# x=odometry.time.values,
# y=pitch_vel,
# name="pitch", # this sets its legend entry
# )
# )
# fig.add_trace(
# go.Scatter(
# x=odometry.time.values, y=yaw_vel, name="yaw", # this sets its legend entry
# )
# )
# fig.update_layout(
# title="Angular velocity odometry",
# xaxis_title="Time stamps (datetime)",
# yaxis_title="Angular Velocity (radians/second)",
# # xaxis_range=[
# # odometry.time.values[indx], #this may be added back in the future - sometimes throws an out-of-bounds error
# # odometry.time.values[indx+samps]] #this may be added back in the future - sometimes throws an out-of-bounds error
# )
# fig.show()
# # Loop
# done = False
# times = []
# while not done:
# pitch_start = input("Pitch Timestamp Start (HH:mm:ss format): ")
# pitch_end = input("Pitch Timestamp End (HH:mm:ss format): ")
# yaw_start = input("Yaw Timestamp Start (HH:mm:ss format): ")
# yaw_end = input("Yaw Timestamp End (HH:mm:ss format): ")
# df_time = pd.Series(odometry.time[0].values)
# pitch_start = datetime.combine(
# df_time.dt.date.values[0],
# datetime.strptime(pitch_start, "%H:%M:%S").time(),
# )
# pitch_end = datetime.combine(
# df_time.dt.date.values[0], datetime.strptime(pitch_end, "%H:%M:%S").time(),
# )
# yaw_start = datetime.combine(
# df_time.dt.date.values[0], datetime.strptime(yaw_start, "%H:%M:%S").time(),
# )
# yaw_end = datetime.combine(
# df_time.dt.date.values[0], datetime.strptime(yaw_end, "%H:%M:%S").time()
# )
# tmp = {
# "calibration": {
# "pitch_start": pitch_start,
# "pitch_end": pitch_end,
# "yaw_start": yaw_start,
# "yaw_end": yaw_end,
# }
# }
# times.append(tmp)
# next_calibration = input("Continue for another pitch/yaw calibration? (y/n) ")
# done = next_calibration != "y"
# # Save output
# with open(yaml_file, mode="w", encoding="utf-8") as fid:
# yaml.dump(times, fid)
# return times
####################################################
##### Load the files for this tracking session #####
####################################################
if __name__ == "__main__":
# Check folders for existance
print(f"** Input folder selected: {input_folder}")
if not os.path.isdir(input_folder):
raise ValueError("Input Folder does not exist. Check the path")
# Check if markers exist and fill in with world timestamps if not
if os.path.exists(marker_times_yaml):
with open(marker_times_yaml, "r", encoding="utf-8") as file:
marker_times = yaml.safe_load(file)
else:
marker_times = specify_marker_epochs(
timestamps=world_timestamps, yaml_file=marker_times_yaml
)
print("** Marker Times Loaded")
# Execute head calibration and timestamp functions
# if os.path.exists(odo_times_yaml):
# print("** Start & End points for head shakes and nods already exist")
# else:
# print("** Execute head calibration for nods and shakes...")
# times = start_end_plot(
# odometry=odometry_data, marker_times=marker_times, yaml_file=odo_times_yaml
# )
# print("** ODO Times Loaded")
print("\n** DONE WITH SETTING MARKER TIMES ** \n")