-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path2_1_test_both_control.py
199 lines (165 loc) · 8.13 KB
/
2_1_test_both_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
"""
In PX4 v1.9.0 Only first four Control Groups are supported
(https://github.com/PX4/Firmware/blob/v1.9.0/src/modules/mavlink/mavlink_receiver.cpp#L980)
This is confusing, since only 2 control groups can be used anyway. What does it mean?
This site may have the answer:
https://jalpanchal1.gitbooks.io/px4-developer-guide/content/en/concept/mixing.html
"""
##################################################################################################################################
import asyncio
from mavsdk import System
from mavsdk.offboard import OffboardError, ActuatorControl, ActuatorControlGroup, PositionNedYaw
import json
LOCAL_HOST_TEST = "udp://:14540"
CURR_USB_CONNECTION = "/dev/ttyACM1"
LIKELY_USB_PI_CONNECTION = "/dev/ttyUSB0"
####################################################################################################################################
def print_pretty_dict(d, indent=4):
"""
Prints the contents of a dictionary (and nested dictionaries) in a pretty format.
Parameters:
d (dict): The dictionary to print.
indent (int): Number of spaces to use for indentation.
"""
def print_dict(d, level=0):
"""Helper function to print dictionary contents."""
for key, value in d.items():
if isinstance(value, dict):
print(' ' * (level * indent) + f"{key}:")
print_dict(value, level + 1)
else:
print(' ' * (level * indent) + f"{key}: {value}")
print_dict(d)
#######################################################################################################################################################
def clamp(value, min_value, max_value):
"""Clamps the value between min_value and max_value."""
return max(min_value, min(value, max_value))
async def set_actuator_control_homemade(drone, elevator, aileron, rudder, starboard_front_motor,
rear_motor, front_motor, starboard_rear_motor, forward_propulsion_motor):
"""Sets the actuator control with two control groups and specific control modifications."""
# This is how the actuator outputs will be set up on Q Ground Control.
# TODO: Make test case to see which control group is FMU_MAIN and which is FMU AUX.
# (and, in turn, confirm that QGroundControl's FMU_MAIN connects to FMU_PWM_OUT on PPM physical outputs.)
group1_controls = [
float('nan'), # NaN for RC ROLL -- fix ordering
float('nan'), # NaN for RC PITCH
float('nan'), # NaN for RC YAW
float('nan'), # NaN for RC Throttle
clamp(elevator, -1, 1), # Elevator in second group
clamp(aileron, -1, 1), # Left Aileron in second group
clamp(rudder, -1, 1), # Rudder in second group
clamp(-aileron, -1, 1), # Right Aileron, reversed number
]
# Define second group with proper values and reverse clamp for aileron
group2_controls = [
float('nan'), # Unused
float('nan'), # Unused
float('nan'), # Unused
clamp(starboard_front_motor, 0, 1),
clamp(rear_motor, 0, 1),
clamp(front_motor, 0, 1),
clamp(starboard_rear_motor, 0, 1),
clamp(forward_propulsion_motor, 0, 1)
]
# Adjust right aileron control
right_aileron = clamp(-aileron, -1, 1) # Reverse aileron for right
group2_controls[1] = right_aileron # Update right aileron in the control group
# Create actuator control groups
control_group1 = ActuatorControlGroup(controls=group1_controls)
control_group2 = ActuatorControlGroup(controls=group2_controls)
# Set actuator control
try:
await drone.offboard.set_actuator_control(ActuatorControl(groups=[control_group1, control_group2]))
except OffboardError as e:
print(f"Setting actuator control failed with error: {e}")
# await asyncio.sleep(0.1)
#######################################################################################################################
async def setup_mavlink_offboard(drone, curr_conn=LOCAL_HOST_TEST):
"""
Abstracts away the setup of the MAVSDK MAVLink protocols.
Parameters:
drone (System()): The drone setup.
curr_con(string): determines how mavlink is connected. Defaults to local host 14450.
Returns:
True / False (bool): Determine if setup was successful or not.
"""
await drone.connect(system_address=curr_conn)
print("Waiting for drone to connect...")
async for state in drone.core.connection_state():
if state.is_connected:
print(f"-- Connected to drone!")
break
print("Waiting for drone to have a global position estimate...")
async for health in drone.telemetry.health():
if health.is_global_position_ok and health.is_home_position_ok:
print("-- Global position estimate OK")
break
print("-- Arming")
await drone.action.arm()
print("-- Setting initial setpoint")
await drone.offboard.set_position_ned(PositionNedYaw(0.0, 0.0, 0.0, 0.0))
print("-- Starting offboard")
try:
await drone.offboard.start()
except OffboardError as error:
print(f"Starting offboard mode failed with error code: {error._result.result}")
print("-- Disarming")
await drone.action.disarm()
return False
return True
##############################################################################################
# #####################################
# Rename the locally defined function for clarity
async def set_actuator_control_test(drone, elevator, aileron, rudder, starboard_front_motor,
rear_motor, front_motor, starboard_rear_motor, forward_propulsion_motor):
""" Testable version of set_actuator_control function. """
return await set_actuator_control_homemade(drone, elevator, aileron, rudder,
starboard_front_motor, rear_motor, front_motor, starboard_rear_motor,
forward_propulsion_motor)
# Unit tests for set_actuator_control
async def test_set_actuator_control():
""" Tests for set_actuator_control function. """
drone = System()
# Mock the drone connection and setup
await setup_mavlink_offboard(drone)
print("Running test cases...")
print("-- Go 0m North, 0m East, -5m Down within local coordinate system")
await drone.offboard.set_position_ned(PositionNedYaw(0.0, 0.0, -5.0, 0.0))
await asyncio.sleep(10)
# Test case 1: Normal input
print("Test case 1: Normal input")
await set_actuator_control_test(drone, 1, 0, 0, 0, 0, 0, 0, 0)
await asyncio.sleep(10)
print("-- Go 5m North, 10m East, -5m Down within local coordinate system")
await drone.offboard.set_position_ned(PositionNedYaw(5.0, 10.0, -5.0, 90.0))
await asyncio.sleep(5)
# Test case 2: Inputs outside bounds
print("Test case 2: Inputs outside bounds")
await set_actuator_control_test(drone, 1.2, -1.5, 0.7, 1.5, -0.2, 0.8, 0.9, 1.1)
await asyncio.sleep(10)
# Test case 3: Using two control groups
print("Test case 3: Using two control groups")
await set_actuator_control_test(drone, 0.3, -0.3, 0.0, 0.1, 0.2, 0.9, 0.4, 0.0)
await asyncio.sleep(10)
# Test case 4: All values at bounds
print("Test case 4: All values at bounds")
await set_actuator_control_test(drone, 1.0, -1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)
await asyncio.sleep(10)
print("-- Go 5m North, 10m East, -5m Down within local coordinate system")
await drone.offboard.set_position_ned(PositionNedYaw(5.0, 10.0, -5.0, 90.0))
await asyncio.sleep(5)
print("-- Go 0m North, 10m East, 0m Down within local coordinate system, turn to face South")
await drone.offboard.set_position_ned(PositionNedYaw(0.0, 10.0, 0.0, 180.0))
await asyncio.sleep(10)
print("-- Stopping offboard")
try:
await drone.offboard.stop()
except OffboardError as error:
print(f"Stopping offboard mode failed with error code: {error._result.result}")
# Main function to run tests
async def run_tests():
""" Run all test cases. """
await test_set_actuator_control()
# Run the asyncio event loop for tests
if __name__ == "__main__":
asyncio.run(run_tests())