import logging
import socket
from owl_client.client.Interfaces import GRPClient, RobotMonitorSocket
from owl_client.client.Interfaces.utils import *
[docs]class OwlClient(object):
"""Class to act as client interface to interact with OWLRobot over a network(connection) recommended to use
ethernet. OWLRobot running a GRPC server with a socket interface that make the robot state data available at 125Hz with no load.
This client class initialise a grpc channel stub which can call the OWLRobot GRPC server functions. These GRPC calls are wrapped by
this class methods that can be called from OwlClient object. A socket listener is also implemented which
listen to OWLRobot state socket interface and update the state data.
"""
def __init__(self, host):
self.logger = logging.getLogger("OWLRobot")
self.logger.setLevel(logging.DEBUG)
self.grpc_client_port = 18861
self.monitor_interface_port = 30001
self.host = host
self.logger.debug("Opening robot monitor socket.....")
self.robot_monitor = RobotMonitorSocket(self.host, self.monitor_interface_port)
self.grpc_client = GRPClient(self.host, self.grpc_client_port)
self.robot_monitor.start()
self.logger.info("Started OWl Client.")
def __repr__(self):
return "Robot Object (IP=%s, state=%s)" % (self.host, self.__get_robot_mode())
def __str__(self):
return self.__repr__()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
[docs] def is_running(self) -> bool:
"""
Return True if robot is available to operate else False.
"""
_mode = self.robot_monitor.getRobotMode(wait=True)
return _mode != RobotMode.POWER_OFF and _mode != RobotMode.NOT_AVAILABLE and _mode!= RobotMode.ERROR
[docs] def is_script_running(self) -> bool:
"""
Return True if robot is running xml script program else False.
"""
return self.robot_monitor.getProgramStatus(wait=True) == ProgramMode.RUNNING
[docs] def get_joint(self, wait=True) -> Joint:
"""
Return the current joint values of robot,wait will allow to return latest data recieved from robot.
"""
return Joint(*self.robot_monitor.getQActual(wait))
[docs] def get_tcp(self, wait=True) -> Pose:
"""
Return the current tcp values of robot,wait will allow to return latest data recieved from robot.
"""
return Pose(*self.robot_monitor.getTCPActual(wait))
[docs] def get_io(self, wait=True) -> list:
"""
Return the digital input status of robot,wait will allow to return latest data recieved from robot.
"""
return self.robot_monitor.getIOStatus(wait)
[docs] def get_robot_mode(self, wait=False) -> int:
"""
Return the robot current operational mode, wait will allow to return latest data recieved from robot.
"""
return self.robot_monitor.getRobotMode(wait)
[docs] def get_program_mode(self , wait=False) -> int:
"""
Return the robot current program execution mode, wait will allow to return latest data recieved from robot.
"""
return self.robot_monitor.getProgramStatus(wait)
[docs] def get_fk(self, joints : list) -> list :
"""
Call robot server to solve forward kinematics request for given joint values.
"""
return self.grpc_client.fk_request(joints)
[docs] def get_ik(self, initial_guess : list, pose : list) -> list:
"""
Call robot server to solve inverse kinematics request for given inital guess and pose.
"""
return self.grpc_client.ik_request(initial_guess, pose)
[docs] def set_payload(self, payload : float):
"""
Call robot server to set the payload.
"""
return self.grpc_client.pay_load(payload)
[docs] def set_gravity(self , g_x : float , g_y : float , g_z : float):
"""
Call robot server to set gravity.
"""
return self.grpc_client.Gravity(g_x , g_y , g_z)
[docs] def power_on(self):
"""
Call robot server to power on.
"""
return self.grpc_client.power_on()
[docs] def power_down(self):
"""
Call robot server to power down.
"""
return self.grpc_client.power_down()
[docs] def enter_teach_mode(self):
"""
Call robot server to enter TeachMode.
"""
return self.grpc_client.enter_teach_mode()
[docs] def end_teach_mode(self):
"""
Call robot server to end TeachMode.
"""
return self.grpc_client.end_teach_mode()
[docs] def set_digital_output(self, digital_pin: int, digital_status: bool):
"""
Call robot server to set the status of digital output pin.
"""
return self.grpc_client.set_digital_output(digital_pin, digital_status)
[docs] def send_script(self, script: str):
"""
Send a xml script program to robot to run.
"""
self.grpc_client.send_script_program(script)
[docs] def pause_script(self):
"""
Pause a running script program
"""
self.grpc_client.pause_script_program()
[docs] def resume_script(self):
"""
Resume a paused script program
"""
self.grpc_client.resume_script_program()
[docs] def stop_script(self):
"""
Stop a script program
"""
self.grpc_client.stop_script_program()
[docs] def move_to_pose( self, goalPose: Pose, toolSpeed: float, wait=True, relative=False, moveType=TrajectoryPlanMode.STRAIGHT):
"""
Request robot server to move to goal pose [x(m), y(m), z(m), roll(rad), pitch(rad), yaw(rad)] with desired tool speed in cartesian space.
Parameters:
goalPose (Pose) : Goal pose robot need to achieve with desired tool speed.\n
toolSpeed(float) : Tool speed with which robot need to do move in mm/sec.\n
wait (bool) : True will make the move call synchronouse and wait till move is completed.\n
relative (bool) : Move relative to current robot pose.\n
moveType (int) : Type of move plan need to generate for move.\n
"""
if type(goalPose) != Pose:
raise Exception("Argument [goalPose] should be type of Pose")
command = Pose()
if relative:
current_pose = self.get_tcp(wait=True)
command.x = current_pose.x + goalPose.x
command.y = current_pose.y + goalPose.y
command.z = current_pose.z + goalPose.z
command.roll = current_pose.roll + goalPose.roll
command.pitch = current_pose.pitch + goalPose.pitch
command.yaw = current_pose.yaw + goalPose.yaw
else:
command = goalPose
return self.grpc_client.move_to_pose(command.get_pose(), toolSpeed, move_type=moveType, wait=wait)
[docs] def move_to_joint(self, jointPose: Joint, jointSpeed: float, wait=True, relative=False):
"""
Request robot server to move to goal joint [Base(rad), Shoulder(rad), Elbow(rad), Wrist1(rad), Wrist2(rad), Wrist3(rad)] with desired tool speed in joint space.
Parameters:
goalJoint (Joint) : Goal joint in radians robot need to achieve with desired joint speed.
jointSpeed (float) : Joint speed with which robot need to do move in degrees/sec.
wait (bool) : True will make the move call synchronouse and wait till move is completed.
relative(bool) : Move relative to current robot joint.
"""
if type(jointPose) != Joint:
raise Exception("Argument [jointPose] should be type of Joint")
command = Joint()
if relative:
current_joint = self.get_joint(wait=True)
command.Base = current_joint.Base + jointPose.Base
command.Elbow = current_joint.Elbow + jointPose.Elbow
command.Shoulder = current_joint.Shoulder + jointPose.Shoulder
command.Wrist1 = current_joint.Wrist1 + jointPose.Wrist1
command.Wrist2 = current_joint.Wrist2 + jointPose.Wrist2
command.Wrist3 = current_joint.Wrist3 + jointPose.Wrist3
else:
command = jointPose
return self.grpc_client.move_to_joint(command.get_joints(), jointSpeed, wait)
[docs] def move_process(self, processWaypoints: list, toolSpeed: float, blendFactor: float = 80, wait=True):
"""
Request robot server to do move process with desired tool speed.
Parameters:
processWaypoints (list) : Cartesian poses to do a move process.\n
toolSpeed (float) : Tool speed with which robot need to do move.\n
blendFactor(float) : Configure the blending radius by blend factor between [0,100].\n
wait (bool) : True will make the move call synchronouse and wait till move is completed.\n
"""
return self.grpc_client.move_process(processWaypoints, toolSpeed, blendFactor, wait)
[docs] def move_trajectory(self, trajectory : Trajectory):
"""
Request robot server to follow a joint trajectory.
Parameters:
trajectory (Trajectory) : Desired trajectory need to follow by robot.\n
"""
if type(trajectory) != Trajectory:
raise Exception("Argument [trajectory] should be type of Trajectory")
return self.grpc_client.move_trajectory(trajectory)
[docs] def move_translate(self, x=0.0, y=0.0, z=0.0, toolSpeed=100):
"""
Request robot server to translate in cartesian space.
Parameters:
x(float) : Translate in x direction.\n
y(float) : Translate in y direction.\n
z(float) : Translate in z direction.\n
toolSpeed (float) : Tool speed with which robot need to do move.\n
"""
command = Pose()
current_pose = self.get_tcp(wait=True)
command.x = current_pose.x + x
command.y = current_pose.y + y
command.z = current_pose.z + z
command.roll = current_pose.roll
command.pitch = current_pose.pitch
command.yaw = current_pose.yaw
return self.grpc_client.move_to_pose(command.get_pose(), toolSpeed)
[docs] def move_up(self, z=0.05, toolSpeed=100):
"""
Request robot server to move in up.
Parameters:
z(float) : Translate in z direction in metres.\n
toolSpeed (float) : Tool speed with which robot need to do move.\n
"""
command = Pose()
current_pose = self.get_tcp(wait=True)
command.x = current_pose.x
command.y = current_pose.y
command.z = current_pose.z + z
command.roll = current_pose.roll
command.pitch = current_pose.pitch
command.yaw = current_pose.yaw
return self.grpc_client.move_to_pose(command.get_pose(), toolSpeed)
[docs] def move_down(self, z=-0.05, toolSpeed=100):
"""
Request robot server to move in down.
Parameters:
z(float) : Translate in z direction in metres.\n
toolSpeed (float) : Tool speed with which robot need to do move.\n
"""
return self.move_up(z , toolSpeed)
[docs] def move_pause(self):
"""
Request robot server to pause the current move.
"""
return self.grpc_client.move_pause()
[docs] def move_resume(self):
"""
Request robot server to resume the paused move.
"""
return self.grpc_client.move_resume()
[docs] def move_abort(self):
"""
Request robot server to abort the current move.
"""
return self.grpc_client.move_arbot()
[docs] def change_speed_fraction(self, speedFraction : float):
"""
Change the speed fraction in asynchronous move.
"""
return self.grpc_client.change_speed_fraction(speedFraction)
[docs] def close(self):
self.robot_monitor.close()
self.grpc_client.close()