Skip to content

Commit

Permalink
chore: implemented robotcell + execute * abstract robot and deps
Browse files Browse the repository at this point in the history
  • Loading branch information
cbiering committed Feb 5, 2025
1 parent 2ca1743 commit c1654f8
Show file tree
Hide file tree
Showing 8 changed files with 707 additions and 79 deletions.
5 changes: 4 additions & 1 deletion examples/03_move_and_set_ios.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@


async def main():
def on_movement(motion_state):
print(motion_state)

async with Nova() as nova:
cell = nova.cell()
controller = await cell.ensure_virtual_robot_controller(
Expand All @@ -42,7 +45,7 @@ async def main():
jnt(home_joints),
]

await motion_group.plan_and_execute(actions, tcp)
await motion_group.plan_and_execute(actions, tcp, on_movement=on_movement)

await cell.delete_robot_controller(controller.name)

Expand Down
6 changes: 5 additions & 1 deletion nova/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from nova.motion_settings import MotionSettings
from nova.types.collision_scene import CollisionScene
from nova.types.pose import Pose
from nova.types.state import MotionState


class Action(pydantic.BaseModel, ABC):
Expand Down Expand Up @@ -497,6 +498,7 @@ def to_set_io(self) -> list[wb.models.SetIO]:
]


# TODO: should not be located here
class MovementControllerContext(pydantic.BaseModel):
combined_actions: CombinedActions
motion_id: str
Expand All @@ -507,4 +509,6 @@ class MovementControllerContext(pydantic.BaseModel):
MovementControllerFunction = Callable[
[ExecuteTrajectoryResponseStream], ExecuteTrajectoryRequestStream
]
MovementController = Callable[[MovementControllerContext], MovementControllerFunction]
MovementController = Callable[
[MovementControllerContext, Callable[[MotionState], None] | None], MovementControllerFunction
]
113 changes: 49 additions & 64 deletions nova/core/motion_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,40 @@

from nova.actions import Action, CombinedActions, MovementController, MovementControllerContext
from nova.core.exceptions import LoadPlanFailed, PlanTrajectoryFailed
from nova.core.movement_controller import move_forward
from nova.core.movement_controller import move_forward, motion_group_state_to_motion_state
from nova.gateway import ApiGateway
from nova.types import InitialMovementConsumer, InitialMovementStream, LoadPlanResponse
from nova.types.pose import Pose
from nova.types import (
InitialMovementStream,
LoadPlanResponse,
Pose,
DhParameter,
MotionState,
RobotState,
)
from nova.core.robot_cell import AbstractRobot
from typing import Callable

MAX_JOINT_VELOCITY_PREPARE_MOVE = 0.2
START_LOCATION_OF_MOTION = 0.0


class MotionGroup:
def __init__(
self, api_gateway: ApiGateway, cell: str, motion_group_id: str, is_activated: bool = False
):
class MotionGroup(AbstractRobot):
def __init__(self, api_gateway: ApiGateway, cell: str, motion_group_id: str):
self._api_gateway = api_gateway
self._motion_api_client = api_gateway.motion_api
self._cell = cell
self._motion_group_id = motion_group_id
self._current_motion: str | None = None
self._optimizer_setup: wb.models.OptimizerSetup | None = None
self.is_activated = is_activated
super().__init__()

async def __aenter__(self):
async def open(self):
await self._api_gateway.motion_group_api.activate_motion_group(
cell=self._cell, motion_group=self._motion_group_id
)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
async def close(self):
# RPS-1174: when a motion group is deactivated, RAE closes all open connections
# this behaviour is not desired in some cases,
# so for now we will not deactivate for the user
Expand All @@ -46,23 +52,7 @@ def current_motion(self) -> str | None:
# raise ValueError("No MotionId attached. There is no planned motion available.")
return self._current_motion

async def plan(self, actions: list[Action] | Action, tcp: str) -> wb.models.JointTrajectory:
"""Plan a trajectory for the given actions
Args:
actions (list[Action] | Action): The actions to be planned. Can be a single action or a list of actions.
Only motion actions are considered for planning.
tcp (str): The identifier of the tool center point (TCP)
Returns:
wb.models.JointTrajectory: The planned joint trajectory
"""
if not isinstance(actions, list):
actions = [actions]

if len(actions) == 0:
raise ValueError("No actions provided")

async def _plan(self, actions: list[Action], tcp: str) -> wb.models.JointTrajectory:
motion_commands = CombinedActions(items=tuple(actions)).to_motion_command() # type: ignore
joints = await self.joints()
robot_setup = await self._get_optimizer_setup(tcp=tcp)
Expand All @@ -83,48 +73,51 @@ async def plan(self, actions: list[Action] | Action, tcp: str) -> wb.models.Join
raise PlanTrajectoryFailed(plan_trajectory_response.response.actual_instance)
return plan_trajectory_response.response.actual_instance

async def execute(
async def _execute(
self,
joint_trajectory: wb.models.JointTrajectory,
tcp: str,
actions: list[Action] | Action | None = None,
# collision_scene: dts.CollisionScene | None,
movement_controller: MovementController = move_forward,
initial_movement_consumer: InitialMovementConsumer | None = None,
actions: list[Action],
on_movement: Callable[[MotionState], None],
movement_controller: MovementController | None = move_forward,
):
"""Execute a planned motion
Args:
joint_trajectory (wb.models.JointTrajectory): The planned joint trajectory
tcp (str): The identifier of the tool center point (TCP)
actions (list[Action] | Action | None): The actions to be executed. Defaults to None.
movement_controller (MovementController): The movement controller to be used. Defaults to move_forward
initial_movement_consumer (InitialMovementConsumer): A consumer for the initial movement
"""
if actions is None:
actions = []
elif not isinstance(actions, list):
actions = [actions]

# Load planned trajectory
load_plan_response = await self._load_planned_motion(joint_trajectory, tcp)

# Move to start position
number_of_joints = await self._get_number_of_joints()
joints_velocities = [MAX_JOINT_VELOCITY_PREPARE_MOVE] * number_of_joints
movement_stream = await self.move_to_start_position(joints_velocities, load_plan_response)

# If there's an initial consumer, feed it the data
async for move_to_response in movement_stream:
if initial_movement_consumer is not None:
initial_movement_consumer(move_to_response)
if on_movement is not None:
# TODO: refactor
if (
move_to_response.state is None
or move_to_response.state.motion_groups is None
or len(move_to_response.state.motion_groups) == 0
or move_to_response.move_response is None
or move_to_response.move_response.current_location_on_trajectory is None
):
continue

# TODO: maybe 1-...
motion_state = motion_group_state_to_motion_state(
move_to_response.state.motion_groups[0],
float(move_to_response.move_response.current_location_on_trajectory),
)
on_movement(motion_state)

movement_controller_context = MovementControllerContext(
combined_actions=CombinedActions(items=tuple(actions)), # type: ignore
motion_id=load_plan_response.motion,
controller = movement_controller(
MovementControllerContext(
combined_actions=CombinedActions(items=tuple(actions)), # type: ignore
motion_id=load_plan_response.motion,
),
on_movement=on_movement,
)
_movement_controller = movement_controller(movement_controller_context)
await self._api_gateway.motion_api.execute_trajectory(self._cell, _movement_controller)

async def plan_and_execute(self, actions: list[Action] | Action, tcp: str):
joint_trajectory = await self.plan(actions, tcp)
await self.execute(joint_trajectory, tcp, actions=actions)
await self._api_gateway.motion_api.execute_trajectory(self._cell, controller)

async def _get_number_of_joints(self) -> int:
spec = await self._api_gateway.motion_group_infos_api.get_motion_group_specification(
Expand Down Expand Up @@ -188,24 +181,16 @@ async def stop(self):
logger.debug(f"No motion to stop for {self}: {e}")

async def get_state(self, tcp: str | None = None) -> wb.models.MotionGroupStateResponse:
"""Get the current state of the motion group
Args:
tcp (str): The identifier of the tool center point (TCP) to be used for tcp_pose in response. If not set,
the flange pose is returned as tcp_pose.
"""
response = await self._api_gateway.motion_group_infos_api.get_current_motion_group_state(
cell=self._cell, motion_group=self.motion_group_id, tcp=tcp
)
return response

async def joints(self) -> tuple:
"""Get the current joint positions"""
state = await self.get_state()
return tuple(state.state.joint_position.joints)

async def tcp_pose(self, tcp: str | None = None) -> Pose:
"""Get the current TCP pose"""
state = await self.get_state(tcp=tcp)
return Pose(state.state.tcp_pose)

Expand Down
77 changes: 66 additions & 11 deletions nova/core/movement_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,50 @@
MovementControllerContext,
MovementControllerFunction,
)
from nova.types import MotionState, Pose, RobotState
from nova.core.exceptions import InitMovementFailed
from typing import Callable


def move_forward(context: MovementControllerContext) -> MovementControllerFunction:
def movement_to_motion_state(movement: wb.models.Movement) -> MotionState | None:
"""Convert a wb.models.Movement to a MotionState."""
if (
movement.movement.state is None
or movement.movement.state.motion_groups is None
or len(movement.movement.state.motion_groups) == 0
or movement.movement.current_location is None
):
return None

# TODO: in which cases do we have more than one motion group here?
motion_group = movement.movement.state.motion_groups[0]
return motion_group_state_to_motion_state(
motion_group, float(movement.movement.current_location)
)


def motion_group_state_to_motion_state(
motion_group_state: wb.models.MotionGroupState, path_parameter: float
) -> MotionState:
tcp_pose = Pose(motion_group_state.tcp_pose)
joints = (
tuple(motion_group_state.joint_current.joints) if motion_group_state.joint_current else None
)
return MotionState(
path_parameter=path_parameter, state=RobotState(pose=tcp_pose, joints=joints)
)


# TODO: generalize common functionality
def move_forward(
context: MovementControllerContext, on_movement: Callable[[MotionState], None] | None
) -> MovementControllerFunction:
"""
movement_controller is an async function that yields requests to the server.
If a movement_consumer is provided, we'll asend() each wb.models.MovementMovement to it,
letting it produce MotionState objects.
"""

async def movement_controller(
response_stream: ExecuteTrajectoryResponseStream,
) -> ExecuteTrajectoryRequestStream:
Expand All @@ -34,16 +74,25 @@ async def movement_controller(

# then we wait until the movement is finished
async for execute_trajectory_response in response_stream:
r2 = execute_trajectory_response.actual_instance
# Terminate the generator
if isinstance(r2, wb.models.Standstill):
if r2.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED:
instance = execute_trajectory_response.actual_instance

# Send the current location to the consumer
if isinstance(instance, wb.models.Movement) and instance.movement and on_movement:
motion_state = movement_to_motion_state(instance)
if motion_state:
on_movement(motion_state)

# Stop when standstill indicates motion ended
if isinstance(instance, wb.models.Standstill):
if instance.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED:
return

return movement_controller


def speed_up(context: MovementControllerContext) -> MovementControllerFunction:
def speed_up(
context: MovementControllerContext, on_movement: Callable[[MotionState], None] | None
) -> MovementControllerFunction:
async def movement_controller(
response_stream: ExecuteTrajectoryResponseStream,
) -> ExecuteTrajectoryRequestStream:
Expand All @@ -70,14 +119,20 @@ async def movement_controller(
# then we wait until the movement is finished
async for execute_trajectory_response in response_stream:
counter += 1
response = execute_trajectory_response.actual_instance
instance = execute_trajectory_response.actual_instance
# Send the current location to the consumer
if isinstance(instance, wb.models.Movement) and on_movement:
motion_state = movement_to_motion_state(instance)
if motion_state:
on_movement(motion_state)

# Terminate the generator
if isinstance(response, wb.models.Standstill):
if response.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED:
if isinstance(instance, wb.models.Standstill):
if instance.standstill.reason == wb.models.StandstillReason.REASON_MOTION_ENDED:
return

if isinstance(response, wb.models.PlaybackSpeedResponse):
playback_speed = response.playback_speed_response
if isinstance(instance, wb.models.PlaybackSpeedResponse):
playback_speed = instance.playback_speed_response
logger.info(f"Current playback speed: {playback_speed}")

if counter % 10 == 0:
Expand Down
Loading

0 comments on commit c1654f8

Please sign in to comment.