Source code for owl_client.client.Interfaces.grpc_client.robot_client

import grpc
import logging

import owl_client.client.Interfaces.grpc_client.robot_service_pb2 as robotServicePb2
import owl_client.client.Interfaces.grpc_client.robot_service_pb2_grpc as robotServiceGRPPb2
from   owl_client.client.Interfaces.utils import TrajectoryPlanMode

[docs]class GRPClient(object): def __init__(self , host , port): self.host = host self.server_port = port self.logger = logging.getLogger(self.__class__.__name__) self.channel = grpc.insecure_channel('{}:{}'.format(self.host, self.server_port)) self.stub = robotServiceGRPPb2.RobotServiceStub(self.channel)
[docs] def pay_load(self , payload): _request = robotServicePb2.PayLoadRequest() _request.payload = payload return self.stub.PayLoad(_request)
[docs] def TcpTransform(self , transform): _request = robotServicePb2.TcpTransformRequest() _request.tcp_transform.extend(transform) return self.stub.TcpTransform(_request)
[docs] def CentreOfTool(self, tool_x , tool_y , tool_z): _request = robotServicePb2.CentreOfToolRequest() _request.tool_x = tool_x _request.tool_y = tool_y _request.tool_z = tool_z return self.stub.CentreOfTool(_request)
[docs] def Gravity(self, g_x , g_y , g_z): _request = robotServicePb2.GravityRequest() _request.g_x = g_x _request.g_y = g_y _request.g_z = g_z return self.stub.Gravity(_request)
[docs] def power_on(self): return self.stub.PowerOn(robotServicePb2.PowerOnRequest())
[docs] def power_down(self): return self.stub.PowerOn(robotServicePb2.PowerDownRequest())
[docs] def enter_teach_mode(self): return self.stub.EnterTeachMode(robotServicePb2.EnterTeachModeRequest())
[docs] def end_teach_mode(self): return self.stub.ExitTeachMode(robotServicePb2.ExitTeachModeRequest())
[docs] def fk_request(self, joints): _request = robotServicePb2.FKRequestRequest() _request.joints.extend(joints) response = self.stub.FKRequest(_request) return response.pose
[docs] def ik_request(self , inital_guess , pose): _request = robotServicePb2.IKRequestRequest() _request.initial_guess.extend(inital_guess) _request.pose.extend(pose) response = self.stub.IKRequest(_request) return response.joints
[docs] def set_digital_output(self , pin , state): _request = robotServicePb2.SetDigitalOutputRequest() _request.output_pin = pin _request.pin_status = state return self.stub.SetDigitalOutput(_request)
[docs] def send_script_program(self, script): _request = robotServicePb2.SendScriptRequest() _request.program = script return self.stub.SendScript(_request)
[docs] def pause_script_program(self): return self.stub.PauseScript(robotServicePb2.PauseScriptRequest())
[docs] def resume_script_program(self): return self.stub.ResumeScript(robotServicePb2.ResumeScriptRequest())
[docs] def stop_script_program(self): return self.stub.StopScript(robotServicePb2.StopScriptRequest())
[docs] def move_to_pose(self , pose , tool_speed , move_type=TrajectoryPlanMode.STRAIGHT , wait=True): _request = robotServicePb2.MoveToPoseRequest() _request.pose.extend(pose) _request.tool_speed = tool_speed _request.move_type = move_type.value _request.wait = wait return self.stub.MoveToPose(_request)
[docs] def move_to_joint(self , joints , joint_speed , wait=True): _request = robotServicePb2.MoveToJointRequest() _request.joints.extend(joints) _request.joint_speed = joint_speed _request.wait = wait return self.stub.MoveToJoint(_request)
[docs] def move_process(self, pose_array , tool_speed , blend_factor=80 , wait=True): if len(pose_array) < 3: raise Exception("Need three points atleast to perform the process command") poses = robotServicePb2.MultiArray() for pose in pose_array: pose_message = poses.arrays.add() pose_message.data.extend(pose) _request = robotServicePb2.MovePosesRequest() _request.poses_data = poses.SerializeToString() _request.tool_speed = tool_speed _request.blend_factor = blend_factor _request.wait = wait return self.stub.MovePoses(_request)
[docs] def move_trajectory(self, trajectoryGoal): positions = robotServicePb2.MultiArray() velocities = robotServicePb2.MultiArray() accelerations = robotServicePb2.MultiArray() for i in range(len(trajectoryGoal.positions)): position_message = positions.arrays.add() velocity_message = velocities.arrays.add() acceleration_message = accelerations.arrays.add() position_message.data.extend(trajectoryGoal.positions[i]) velocity_message.data.extend(trajectoryGoal.velocities[i]) acceleration_message.data.extend(trajectoryGoal.accelerations[i]) _request = robotServicePb2.MoveTrajectoryRequest() _request.joint_points = positions.SerializeToString() _request.velocity_points = velocities.SerializeToString() _request.acceleration_points = accelerations.SerializeToString() return self.stub.MoveTrajectory(_request)
[docs] def move_pause(self): return self.stub.MovePause(robotServicePb2.MovePauseRequest())
[docs] def move_resume(self): return self.stub.MoveResume(robotServicePb2.MoveResumeRequest())
[docs] def move_arbot(self): return self.stub.MoveAbort(robotServicePb2.MoveAbortRequest())
[docs] def change_speed_fraction(self , speed_fraction): return self.stub.ChangeSpeedFraction(robotServicePb2.ChangeSpeedFractionRequest(speed_fraction=speed_fraction))
[docs] def close(self): self.channel.close()