import socket import json import os import math import numpy as np import sys from typing import Literal from pprint import pprint import threading import pybullet as p import pybullet_industrial as pi from robot.func import * from logger import logger # os.environ["LIBGL_ALWAYS_SOFTWARE"] = "1" class PrepareRobotData: line_speed = 100.0 line_smooth = 0 line_tool = 0 def make_step(self, type, point, start_coordinates): step = { "oneshot": "1", "delay": "0.0", "speed": str(self.line_speed), "smooth": str(self.line_smooth), "coord": "0", "tool": str(self.line_tool), "ckStatus": "0x3F", } if type == "line" or type == "free": pairs = zip(start_coordinates, point) data = [] for pair in pairs: data.append(round(sum(pair), 3)) m0, m1, m2, m3, m4, m5 = data if type == "line": step.update({"action": "10"}) if type == "free": step.update({"action": "4"}) step.update({"m0": m0, "m1": m1, "m2": m2, "m3": m3, "m4": m4, "m5": m5}) step.update({"m6": 0, "m7": 0}) elif type == "curve": pairs = zip(self.world_coordinates, point[:5]) m0, m1, m2, m3, m4, m5 = [round(sum(i), 3) for i in pairs] pairs_p = zip(self.world_coordinates, point[6:]) m0_p, m1_p, m2_p, m3_p, m4_p, m5_p = [round(sum(i), 3) for i in pairs_p] step.update({"action": "17"}) step.update({"m0": m0, "m1": m1, "m2": m2, "m3": m3, "m4": m4, "m5": m5}) step.update({"m6": 0, "m7": 0}) step.update( { "m0_p": m0_p, "m1_p": m1_p, "m2_p": m2_p, "m3_p": m3_p, "m4_p": m4_p, "m5_p": m5_p, } ) step.update({"m6_p": 0, "m7_p": 0}) for s in step: step[s] = str(step[s]) return step class SocketManager: def __init__(self): self.host = None self.port = 9760 self.socket = None self.status: Literal["connected", "not_connected", "error"] = "not_connected" def connect(self, host): self.host = host if self.socket is None: self.socket = socket.socket() logger.info(f"trying connect to {(self.host, self.port)}") self.socket.connect((self.host, self.port)) self.status = "connected" def close(self): if self.socket: self.socket.close() self.socket = None self.status = "not_connected" def send_data(self, data): if not self.socket: return try: self.socket.send(str.encode(json.dumps(data))) response_data = self.socket.recv(1024) response = json.loads(response_data) if data["reqType"] == "query": return response["queryData"] elif data["reqType"] == "command": return response["cmdReply"] elif data["reqType"] == "AddRCC" and "cmdReply" in response.keys(): return response["cmdReply"] else: pprint(response_data) return json.loads(response_data) except Exception as e: logger.error(f"Error sending data: {e}") return None class UrdfManager: urdf_filename = None physics_client = None body_id = None def __init__(self, robot_start_position, application_path): self.robot_start_position = robot_start_position self.application_path = application_path def start_loop(self, urdf): self.urdf_filename = urdf p.resetSimulation() self.load_models() def run_pybullet(self, type="DIRECT"): self.physics_client = p.connect(getattr(p, type)) logger.info(f"Connect to {self.physics_client} by {type}") def load_models(self): # p.loadURDF("urdf/plane.urdf", physicsClientId=self.physics_client) urdf_path = os.path.join( self.application_path, "urdf", f"{self.urdf_filename}.urdf" ) self.body_id = p.loadURDF(urdf_path, self.robot_start_position) time.sleep(1) def get_pybullet_image(self): if self.physics_client is None: return width, height, rgb, _, _ = p.getCameraImage( width=500, height=500, viewMatrix=p.computeViewMatrix( cameraEyePosition=[4, -4, 1.5], # cameraEyePosition=[4,-4,4], cameraTargetPosition=[0, 0, 0], # Центр фокусировки камеры cameraUpVector=[0, 0, 1], # Направление вверх ), projectionMatrix=p.computeProjectionMatrixFOV( fov=60.0, aspect=1.0, nearVal=0.1, farVal=10.0 ), # renderer=p.ER_TINY_RENDERER, physicsClientId=self.physics_client, ) return (rgb, width, height) class SocketRobotArm: global_speed = 10 physical_speed = 10 # laser_id = 15 laser_id = 14 filename = "axis" urdf_filename = "sample" pass_size = 4 Status = Literal["connected", "not_connected", "error"] axis_coordinates = [] world_coordinates = [] remote_command_count = [] command_type = "base" command_data = None def __init__(self, application_path, *args, **kwargs): self.socket = None self.host = None self.port = 9760 self.slave_id = None self.status: SocketRobotArm.Status = "not_connected" self.q_app = None self.imitate_point = None robot_start_position = [0, -0, 0] self.prepare_data = PrepareRobotData() self.socket_manager = SocketManager() self.urdf_manager = UrdfManager( robot_start_position=robot_start_position, application_path=application_path ) def __exit__(self, exc_type, exc_value, traceback): logger.info("exiting") self.socket.close() def start(self, type="DIRECT"): self.urdf_manager.run_pybullet(type) def start_loop(self, urdf): self.urdf_manager.start_loop(urdf) def load_models(self): self.urdf_manager.load_models() self.body_id = self.urdf_manager.body_id def get_pybullet_image(self): return self.urdf_manager.get_pybullet_image() def close(self): self.socket_manager.close() def connect(self, host, slave_id): self.socket_manager.connect(host) with open("delta.json", "w") as f: pass def get_status(self): return self.socket_manager.status def upd_model(self): threading.Thread(target=self.upd_model_func, daemon=True).start() def upd_model_func(self): # logger.info("UPDATE MODEL FUNC")\ mode = "by_joint" # mode = "by_world" self.get_axis() self.set_text(text=f"Координаты осей {self.axis_coordinates}") self.get_world() self.set_text(text=f"Мировые координаты {self.world_coordinates}") num_joints = p.getNumJoints(self.urdf_manager.body_id) axisJointPoses = self.axis_coordinates for i in range(num_joints): p.resetJointState( self.urdf_manager.body_id, i, math.radians(axisJointPoses[i]) ) time.sleep(1) coordinates = self.world_coordinates[:3] angles = [ self.world_coordinates[1], self.world_coordinates[2], self.world_coordinates[3], ] worldJointPoses = self.convert_to_joint_base( [float(p) * 0.001 for p in coordinates], [math.radians(p) for p in angles] ) for i in range(num_joints): p.resetJointState(self.urdf_manager.body_id, i, worldJointPoses[i]) calcWorldPosition = p.getLinkState( self.urdf_manager.body_id, num_joints - 1, computeForwardKinematics=True ) calcWorldPosition = list( [p * 1000 for p in calcWorldPosition[0]] + [np.degrees(p) for p in p.getEulerFromQuaternion(calcWorldPosition[1])] ) self._get_command_count() self.set_text(text=f"Команд в очереди {self.remote_command_count}") time.sleep(0.5) def cycle_base(self): self.upd_model() self.send_data(self.set_global_speed()) self.set_text(text=f"Установили глобальную скорость {self.global_speed}") time.sleep(0.5) def cycle_start(self): self.send_data(self.start_cycle()) self.set_text(text=f"Старт одиночного цикла") time.sleep(0.5) commands = ( self.steps_from_file() if self.command_type == "base" else self.convert_file_to_joint() ) with open(f"log/{self.command_type}.json", "w") as myfile: myfile.write(json.dumps(commands)) self.add_rcc_list = ( [self.set_physical_speed(True), self.set_output_laser(True)] + commands + [self.set_physical_speed(False), self.set_output_laser(False)] ) step = 4 empty = 1 for i in range(0, len(self.add_rcc_list), step): self.command_data = f"Отправка данных {i}...{i+step-1}" self.send_data(make_addrcc_data(self.add_rcc_list[i : i + step], empty)) empty = 0 time.sleep(0.05) def imitate(self): threading.Thread(target=self.imitate_func, daemon=True).start() def set_text(self, text): logger.info(text) def send_data(self, data): return self.socket_manager.send_data(data) def get_axis(self): axis_coord_raw = self.send_data( make_query_data( ["axis-0", "axis-1", "axis-2", "axis-3", "axis-4", "axis-5"] ) ) self.axis_coordinates = [float(i) for i in axis_coord_raw] print("start_axis_coordinates", self.axis_coordinates) def get_axis_coordinates(self): return self.axis_coordinates def get_world(self): world_coord_raw = self.send_data( make_query_data( ["world-0", "world-1", "world-2", "world-3", "world-4", "world-5"] ) ) self.world_coordinates = [float(i) for i in world_coord_raw] print("start_world_coordinates", self.world_coordinates) def get_world_coordinates(self): return self.world_coordinates def _get_command_count(self): res = self.send_data(make_query_data(["RemoteCmdLen"])) self.remote_command_count = res print(res) def get_command_count(self): return self.remote_command_count def set_command_type(self, data): self.command_type = data def get_command_type(self): return self.command_type def get_command_data(self): return self.command_data def get_imitate_point(self): return self.imitate_point def set_global_speed(self): # Изменили глобальную скорость на global_speed% return make_command_data(["modifyGSPD", str(self.global_speed * 10)]) def start_cycle(self): return make_command_data(["actionSingleCycle"]) def set_physical_speed(self, status: bool = False): return ( { "oneshot": "0", "action": "51", "isUse": str(int(status)), "speed": str(self.physical_speed * 1000), }, ) def set_output_laser(self, status: bool = False): return ( { "oneshot": "0", "action": "200", "type": "0", "io_status": str(int(status)), "point": str(self.laser_id), }, ) def imitate_func(self): points = self.steps_from_file() for i, point in enumerate(points): if point["action"] == "10": self.imitate_point = i points = [ point["m0"], point["m1"], point["m2"], ] angles = [ point["m3"], point["m4"], point["m5"], ] jointPoses = self.convert_to_joint_base( [float(p) * 0.001 for p in points], angles ) num_joints = p.getNumJoints(self.urdf_manager.body_id) for i in range(num_joints): p.resetJointState(self.urdf_manager.body_id, i, jointPoses[i]) time.sleep(1) def convert_to_joint_base(self, coordinates, angles): body_id = 0 num_joints = p.getNumJoints(body_id) lower_limits = [] upper_limits = [] for i in range(num_joints): limit = p.getJointInfo(body_id, i) lower_limits.append(limit[8]) upper_limits.append(limit[9]) joint_angles = p.calculateInverseKinematics( body_id, endEffectorLinkIndex=num_joints - 1, targetPosition=[float(c) for c in coordinates], targetOrientation=angles, # lowerLimits=lower_limits, # upperLimits=upper_limits, # jointRanges=[4, 4, 4, 4, 4, 4], # restPoses=[ # math.radians(0.1873), # math.radians(59.059), # math.radians(-99.4105), # math.radians(-0.2214), # math.radians(41.7877), # math.radians(-0.0011), # ], ) return joint_angles def convert_file_to_joint(self): result = [] with open(f"data/{self.filename}.nc.result", "r") as fp: for line in fp: data = line.strip().split(" ") prep = {} for item in data: prep[item[:1]] = float(item[1:]) pj = list( self.convert_to_joint_base( coordinates=[ f for f in ( prep.get("X", 0) + self.world_coordinates[0], prep.get("Y", 0) + self.world_coordinates[1], prep.get("Z", 0) + self.world_coordinates[2], ) ], angles=[ prep.get("U", 0) + self.world_coordinates[3], prep.get("V", 0) + self.world_coordinates[4], prep.get("W", 0) + self.world_coordinates[5], ], ) ) points = np.degrees(pj) start_position = [0, 60, -100, 0, 40, 0] start_position = [0, 0, 0, 0, 0, 0] result.append( self.prepare_data.make_step("free", points, start_position) ) return result def steps_from_file(self): result = [] with open(f"data/{self.filename}.nc.result", "r") as fp: for line in fp: data = line.strip().split(" ") prep = {} for item in data: prep[item[:1]] = float(item[1:]) result.append( self.prepare_data.make_step( "line", ( prep.get("X", 0), prep.get("Y", 0), prep.get("Z", 0), prep.get("U", 0), prep.get("V", 0), prep.get("W", 0), ), self.world_coordinates, ) ) return result