import grpc
import logging
import owl_client.client.Interfaces.grpc_client.robot_service_pb2 as robotServicePb2
import owl_client.client.Interfaces.grpc_client.robot_service_pb2_grpc as robotServiceGRPPb2
from owl_client.client.Interfaces.utils import TrajectoryPlanMode
[docs]class GRPClient(object):
def __init__(self , host , port):
self.host = host
self.server_port = port
self.logger = logging.getLogger(self.__class__.__name__)
self.channel = grpc.insecure_channel('{}:{}'.format(self.host, self.server_port))
self.stub = robotServiceGRPPb2.RobotServiceStub(self.channel)
[docs] def pay_load(self , payload):
_request = robotServicePb2.PayLoadRequest()
_request.payload = payload
return self.stub.PayLoad(_request)
[docs] def Gravity(self, g_x , g_y , g_z):
_request = robotServicePb2.GravityRequest()
_request.g_x = g_x
_request.g_y = g_y
_request.g_z = g_z
return self.stub.Gravity(_request)
[docs] def power_on(self):
return self.stub.PowerOn(robotServicePb2.PowerOnRequest())
[docs] def power_down(self):
return self.stub.PowerOn(robotServicePb2.PowerDownRequest())
[docs] def enter_teach_mode(self):
return self.stub.EnterTeachMode(robotServicePb2.EnterTeachModeRequest())
[docs] def end_teach_mode(self):
return self.stub.ExitTeachMode(robotServicePb2.ExitTeachModeRequest())
[docs] def fk_request(self, joints):
_request = robotServicePb2.FKRequestRequest()
_request.joints.extend(joints)
response = self.stub.FKRequest(_request)
return response.pose
[docs] def ik_request(self , inital_guess , pose):
_request = robotServicePb2.IKRequestRequest()
_request.initial_guess.extend(inital_guess)
_request.pose.extend(pose)
response = self.stub.IKRequest(_request)
return response.joints
[docs] def set_digital_output(self , pin , state):
_request = robotServicePb2.SetDigitalOutputRequest()
_request.output_pin = pin
_request.pin_status = state
return self.stub.SetDigitalOutput(_request)
[docs] def send_script_program(self, script):
_request = robotServicePb2.SendScriptRequest()
_request.program = script
return self.stub.SendScript(_request)
[docs] def pause_script_program(self):
return self.stub.PauseScript(robotServicePb2.PauseScriptRequest())
[docs] def resume_script_program(self):
return self.stub.ResumeScript(robotServicePb2.ResumeScriptRequest())
[docs] def stop_script_program(self):
return self.stub.StopScript(robotServicePb2.StopScriptRequest())
[docs] def move_to_pose(self , pose , tool_speed , move_type=TrajectoryPlanMode.STRAIGHT , wait=True):
_request = robotServicePb2.MoveToPoseRequest()
_request.pose.extend(pose)
_request.tool_speed = tool_speed
_request.move_type = move_type.value
_request.wait = wait
return self.stub.MoveToPose(_request)
[docs] def move_to_joint(self , joints , joint_speed , wait=True):
_request = robotServicePb2.MoveToJointRequest()
_request.joints.extend(joints)
_request.joint_speed = joint_speed
_request.wait = wait
return self.stub.MoveToJoint(_request)
[docs] def move_process(self, pose_array , tool_speed , blend_factor=80 , wait=True):
if len(pose_array) < 3:
raise Exception("Need three points atleast to perform the process command")
poses = robotServicePb2.MultiArray()
for pose in pose_array:
pose_message = poses.arrays.add()
pose_message.data.extend(pose)
_request = robotServicePb2.MovePosesRequest()
_request.poses_data = poses.SerializeToString()
_request.tool_speed = tool_speed
_request.blend_factor = blend_factor
_request.wait = wait
return self.stub.MovePoses(_request)
[docs] def move_trajectory(self, trajectoryGoal):
positions = robotServicePb2.MultiArray()
velocities = robotServicePb2.MultiArray()
accelerations = robotServicePb2.MultiArray()
for i in range(len(trajectoryGoal.positions)):
position_message = positions.arrays.add()
velocity_message = velocities.arrays.add()
acceleration_message = accelerations.arrays.add()
position_message.data.extend(trajectoryGoal.positions[i])
velocity_message.data.extend(trajectoryGoal.velocities[i])
acceleration_message.data.extend(trajectoryGoal.accelerations[i])
_request = robotServicePb2.MoveTrajectoryRequest()
_request.joint_points = positions.SerializeToString()
_request.velocity_points = velocities.SerializeToString()
_request.acceleration_points = accelerations.SerializeToString()
return self.stub.MoveTrajectory(_request)
[docs] def move_pause(self):
return self.stub.MovePause(robotServicePb2.MovePauseRequest())
[docs] def move_resume(self):
return self.stub.MoveResume(robotServicePb2.MoveResumeRequest())
[docs] def move_arbot(self):
return self.stub.MoveAbort(robotServicePb2.MoveAbortRequest())
[docs] def change_speed_fraction(self , speed_fraction):
return self.stub.ChangeSpeedFraction(robotServicePb2.ChangeSpeedFractionRequest(speed_fraction=speed_fraction))
[docs] def close(self):
self.channel.close()