Source code for owl_client.client.Interfaces.socket.owl_monitor_socket

import logging
import socket
import threading
import time

import msgpack

[docs]class RobotMonitorSocket(threading.Thread): def __init__(self, host_address: str, port: int): super().__init__() self.name = "RobotMontiorSocket" self.daemon = True self.logger = logging.getLogger(self.__class__.__name__) self.__host_address = host_address self.__port = port self._stop_event = True self._data_event = threading.Condition() self._data_access = threading.Lock() self._monitor_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._monitor_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self._monitor_sock.connect((self.__host_address, self.__port)) self._timestamp = None # Data Variables self._ctrl_time_stamp = None self._q_actual = None self._tcp_actual = None self._io_status = None self._program_status = None self._program_state_id = None self._robot_mode = None self._robot_code = None self._last_ctrl_time_stamp = 0 self.__recv_time = 0 def __recv_bytes(self, nBytes): recv_time = 0 data = b'' while len(data) < nBytes: data += self._monitor_sock.recv(nBytes - len(data)) if recv_time == 0: recv_time = time.time() self.__recv_time = recv_time return data
[docs] def wait(self): with self._data_event: self._data_event.wait()
[docs] def q_actual(self , wait=False): if wait: self.wait() with self._data_access: return self._q_actual
getQActual = q_actual
[docs] def tcp_actual(self, wait=False): if wait: self.wait() with self._data_access: return self._tcp_actual
getTCPActual = tcp_actual
[docs] def io_status(self, wait=False): if wait: self.wait() with self._data_access: return self._io_status
getIOStatus = io_status
[docs] def program_status(self,wait=False): if wait: self.wait() with self._data_access: return self._program_status
getProgramStatus = program_status
[docs] def program_state_id(self , wait=False): if wait: self.wait() with self._data_access: return self._program_state_id
getProgramStateID = program_state_id
[docs] def robot_mode(self , wait=False): if wait: self.wait() with self._data_access: return self._robot_mode
getRobotMode = robot_mode
[docs] def robot_code(self , wait=False): if wait: self.wait() with self._data_access: return self._robot_code
getRobotCode = robot_code def __recv_monitor_data(self): header_bytes = self.__recv_bytes(4) timestamp = self.__recv_time packet_size = int.from_bytes(header_bytes, byteorder="big") self.logger.info('Received header telling that package is %s bytes long',packet_size) packet_payload = msgpack.unpackb(self.__recv_bytes(packet_size)) with self._data_access: self._timestamp = timestamp self._ctrl_time_stamp = packet_payload[0] if self._last_ctrl_time_stamp != 0 and ( self._ctrl_time_stamp - self._last_ctrl_time_stamp) > 0.1: self.logger.warn("Error the controller failed to send us a packet: time since last packet %s s ", self._ctrl_time_stamp - self._last_ctrl_time_stamp) self._last_ctrl_time_stamp = self._ctrl_time_stamp self._q_actual = packet_payload[1] self._tcp_actual = packet_payload[2] self._io_status = packet_payload[3] self._program_status = packet_payload[4] self._program_state = packet_payload[5] self._robot_mode = packet_payload[7] self._robot_code = packet_payload[8] with self._data_event: self._data_event.notifyAll()
[docs] def get_all_data(self, wait=True): if wait: self.wait() with self._data_access: return dict( timestamp=self._timestamp, ctrltimestamp=self._ctrl_time_stamp, q_actual = self._q_actual, tcp_actual =self._tcp_actual, io_status = self._io_status, program_status = self._program_status, program_state_id = self._program_state_id, robot_mode = self._robot_mode, robot_code = self._robot_code)
getALLData = get_all_data
[docs] def stop(self): self._stop_event = True
[docs] def close(self): self.stop() self.join()
[docs] def run(self): self._stop_event = False while not self._stop_event: self.__recv_monitor_data() self._monitor_sock.close()