import socket import json import os import math import numpy as np from typing import Literal from pprint import pprint import threading import pybullet as p import pybullet_industrial as pi from logger import logger from robot.func import * from robot.prepare_data import PrepareRobotData from robot.urdf_manager import UrdfManager from robot.socket_manager import SocketManager # os.environ["LIBGL_ALWAYS_SOFTWARE"] = "1" class SocketRobotArm: global_speed = 10 physical_speed = 10 # laser_id = 15 laser_id = 14 filename = "axis" urdf_filename = "sample" pass_size = 4 connected_status = Literal["connected", "not_connected", "error"] _axis_coordinates = [] _world_coordinates = [] _command_count = [] command_type = "base" command_data = None motion_type = "reset" motion_type = "normal" def __init__(self, application_path, *args, **kwargs): self.socket = None self.host = None self.port = 9760 self.slave_id = None self.status: SocketRobotArm.connected_status = "not_connected" self.q_app = None self.imitate_point = None robot_start_position = [0, -0, 0] self.prepare_data = PrepareRobotData() self.socket_manager = SocketManager() self.urdf_manager = UrdfManager( robot_start_position, application_path, self.palletizing, ) self.constraint_id = None def __exit__(self, exc_type, exc_value, traceback): logger.info("exiting") self.socket.close() def start(self, type="DIRECT"): self.urdf_manager.run_pybullet(type) def start_loop(self, urdf): self.urdf_manager.start_loop(urdf) self.body_id = self.urdf_manager.body_id self.num_joints = p.getNumJoints(self.body_id) def load_palletizing(self): self.urdf_manager.load_palletizing() def start_moving(self): self.urdf_manager.conveyor.moving = True def get_pybullet_image(self): return self.urdf_manager.get_pybullet_image() def close(self): self.socket_manager.close() def connect(self, host, slave_id): self.socket_manager.connect(host) with open("delta.json", "w") as f: pass def get_status(self): return self.socket_manager.status def motionFund(self, radians): if self.motion_type == "reset": for index in radians: p.resetJointState(self.urdf_manager.body_id, index, radians) time.sleep(0.3) else: p.setJointMotorControlArray( self.urdf_manager.body_id, [i for i in range(len(radians))], p.POSITION_CONTROL, radians, ) time.sleep(1) def upd_model(self): threading.Thread(target=self.upd_model_func, daemon=True).start() def upd_model_func(self): self.get_coordinates("axis") self.set_text(text=f"Координаты осей {self.axis_coordinates}") time.sleep(0.5) self.get_coordinates("world") self.set_text(text=f"Мировые координаты {self.world_coordinates}") time.sleep(0.5) self._fetch_command_count() time.sleep(0.5) axisJointPoses = self.axis_coordinates self.motionFund(np.radians(axisJointPoses)) time.sleep(0.5) if False: coordinates = [float(p) * 0.001 for p in self.world_coordinates[:3]] angles = [math.radians(p) for p in self.world_coordinates[3:6]] worldJointPoses = self.convert_to_joint_base(coordinates, angles) for i in range(self.num_joints): v = worldJointPoses[i] if i < len(worldJointPoses) else 0 self.motionFund(worldJointPoses) time.sleep(0.5) calcWorldPosition = p.getLinkState( self.urdf_manager.body_id, self.num_joints - 1, computeForwardKinematics=True, ) calcWorldPosition = list( [p * 1000 for p in calcWorldPosition[0]] + [ np.degrees(p) for p in p.getEulerFromQuaternion(calcWorldPosition[1]) ] ) time.sleep(0.5) position_diff = np.linalg.norm( self.world_coordinates[:3] - np.array(calcWorldPosition[:3]) ) print( "Разница между расчетом по модели и мировыми координатами:", position_diff, ) self.set_text(text=f"Команд в очереди {self.command_count}") time.sleep(0.5) def cycle_base(self): self.upd_model() self.send_data(self.set_global_speed()) self.set_text(text=f"Установили глобальную скорость {self.global_speed}") time.sleep(0.5) def cycle_start(self): self.send_data(self.start_cycle()) self.set_text(text=f"Старт одиночного цикла") time.sleep(0.5) if self.command_type == "base": commands = self.steps_from_file() if self.command_type == "calc": commands = self.convert_point_to_free() if self.command_type == "pallette": commands = self.get_palletizing_json() with open(f"log/{self.command_type}.json", "w") as myfile: myfile.write(json.dumps(commands)) self.add_rcc_list = ( [self.set_physical_speed(True), self.set_output_laser(True)] + commands + [self.set_physical_speed(False), self.set_output_laser(False)] ) step = 4 empty = 1 for i in range(0, len(self.add_rcc_list), step): self.command_data = f"Отправка данных {i}...{i+step-1}" self.send_data(make_addrcc_data(self.add_rcc_list[i : i + step], empty)) empty = 0 time.sleep(0.05) def imitate(self): threading.Thread(target=self.imitate_func, daemon=True).start() def palletizing(self, position, box_id, counter): dir_wall = (self.urdf_manager.conv_direction + 1) % 2 near_position = position[:] near_position[dir_wall] -= 0.4 object_position = position[:] object_position[dir_wall] -= 0.02 pallet_position = self.urdf_manager.packing[ counter % len(self.urdf_manager.packing) ] logger.info(f"pallete {pallet_position}") base1_position = pallet_position[:] base1_position[0] -= 0.2 base1_position[2] += 0.2 base_position = pallet_position[:] base_position[0] -= 0.2 drop_position = pallet_position[:] drop_position[0] -= 0.4 drop_position[2] += 0.4 json_res = [] tool_pos = [0, 0, 0] steps = [ {"type": "move", "coordinates": near_position}, {"type": "move", "coordinates": object_position}, {"type": "create_constraint"}, {"type": "move", "coordinates": base1_position}, {"type": "move", "coordinates": base_position}, {"type": "move", "coordinates": pallet_position}, {"type": "remove_constraint"}, {"type": "move", "coordinates": base_position}, {"type": "move", "coordinates": base1_position}, {"type": "move", "coordinates": drop_position}, ] constraint_id = None for step in steps: if step["type"] == "move": # logger.info(step["coordinates"]) jointPoses = self.convert_to_joint_base( step["coordinates"], np.radians(tool_pos), ) json_res.append( self.prepare_data.make_step( "free", np.degrees(jointPoses), [0, 0, 0, 0, 0, 0] ) ) self.motionFund(jointPoses) # time.sleep(2) if step["type"] == "create_constraint": link_state = p.getLinkState(self.body_id, self.num_joints - 1) orientation = link_state[1] constraint_id = p.createConstraint( parentBodyUniqueId=self.body_id, parentLinkIndex=self.num_joints - 1, childBodyUniqueId=box_id, childLinkIndex=-1, jointType=p.JOINT_FIXED, jointAxis=[0, 0, 0], parentFramePosition=[0, 0, 0], childFramePosition=[0, 0.2, 0], parentFrameOrientation=p.getQuaternionFromEuler( np.radians([0, 0, 90]) ), childFrameOrientation=orientation, ) if step["type"] == "remove_constraint": time.sleep(0.5) p.removeConstraint(constraint_id) # time.sleep(0.5) # p.resetBasePositionAndOrientation( # box_id, # pallet_position, # p.getQuaternionFromEuler(np.radians([0, 0, 0])), # ) with open(f"data/palletizing.json", "w") as myfile: myfile.write(json.dumps(json_res)) self.urdf_manager.conveyor.conveyor_stopped = False def set_text(self, text): logger.info(text) def send_data(self, data): return self.socket_manager.send_data(data) def get_coordinates(self, coord_type: Literal["axis", "world"] = "axis"): request = make_query_data([f"{coord_type}-{i}" for i in range(6)]) result = self.send_data(request) if not result: return data = [float(i) for i in result] if coord_type == "axis": self._axis_coordinates = data elif coord_type == "world": self._world_coordinates = data @property def axis_coordinates(self): return self._axis_coordinates @property def world_coordinates(self): return self._world_coordinates def _fetch_command_count(self): request = make_query_data(["RemoteCmdLen"]) result = self.send_data(request) self._command_count = result @property def command_count(self): return self._command_count def set_command_type(self, data): self.command_type = data def get_command_type(self): return self.command_type def get_command_data(self): return self.command_data def get_imitate_point(self): return self.imitate_point def set_global_speed(self): return make_command_data(["modifyGSPD", str(self.global_speed * 10)]) def start_cycle(self): return make_command_data(["actionSingleCycle"]) def set_physical_speed(self, status: bool = False): return ( { "oneshot": "0", "action": "51", "isUse": str(int(status)), "speed": str(self.physical_speed * 1000), }, ) def set_output_laser(self, status: bool = False): return ( { "oneshot": "0", "action": "200", "type": "0", "io_status": str(int(status)), "point": str(self.laser_id), }, ) def imitate_func(self): points = self.convert_point_to_free() for point in points: jointPoses = [np.radians(float(point[f"m{i}"])) for i in range(6)] self.motionFund(jointPoses) def get_palletizing_json(self): with open(f"data/palletizing.json", "r") as fp: data = json.load(fp) return data def convert_point_to_free(self): points = self.steps_from_file() res = [] for i, point in enumerate(points): if point["action"] == "10": self.imitate_point = i points = [ point["m0"], point["m1"], point["m2"], ] angles = [ point["m3"], point["m4"], point["m5"], ] jointPoses = self.convert_to_joint_base( [float(p) * 0.001 for p in points], np.radians([float(x) for x in angles]), ) res.append( self.prepare_data.make_step( "free", np.degrees(jointPoses), [0, 0, 0, 0, 0, 0] ) ) return res def convert_to_joint_base(self, coordinates, angles): lower_limits = [] upper_limits = [] for i in range(self.num_joints): limit = p.getJointInfo(self.body_id, i) lower_limits.append(limit[8]) upper_limits.append(limit[9]) joint_angles = p.calculateInverseKinematics( self.body_id, endEffectorLinkIndex=self.num_joints - 1, targetPosition=[float(c) for c in coordinates], targetOrientation=p.getQuaternionFromEuler(angles), ) return joint_angles def steps_from_file(self): if not self.world_coordinates: return [] result = [] with open(f"data/{self.filename}.nc.result", "r") as fp: for line in fp: data = line.strip().split(" ") prep = {} axis = ["X", "Y", "Z", "U", "V", "W"] for item in data: if item and item[0] and item[0] in axis: prep[item[:1]] = float(item[1:]) if len(prep) > 0: r = self.prepare_data.make_step( "line", ( prep.get("X", 0), prep.get("Y", 0), prep.get("Z", 0), prep.get("U", 0), prep.get("V", 0), prep.get("W", 0), ), self.world_coordinates, ) # print(r) result.append(r) return result