import socket import json import os import sys import math from pprint import pprint import tkinter import threading import pybullet as p import pybullet_data os.environ["LIBGL_ALWAYS_SOFTWARE"] = "1" from func import * class SocketRobotArm: line_speed = 100.0 line_smooth = 9 line_tool = 1 global_speed = 100 physical_speed = 50 # laser_id = 15 laser_id = 14 # filename = 'half-sphere-no-angle' filename = "test" urdf_filename = "sample" # urdf_filename = "fanucM16ib" pass_size = 4 def __init__(self, *args, **kwargs): self.socket = socket.socket() self.host = MODBUS_SERVER_HOST self.port = 9760 if len(sys.argv) and "--test" in sys.argv: self.host = "127.0.0.1" self.port = 65432 self.tkinter_root = tkinter.Tk() self.tkinter_info_label = tkinter.Label(self.tkinter_root) self.tkinter_exit = tkinter.Button( self.tkinter_root, text="Exit", command=self.close ) self.tkinter_start_cycle = tkinter.Button( self.tkinter_root, text="Start cycle", command=self.cycle_start ) self.tkinter_imitate = tkinter.Button( self.tkinter_root, text="Imitate", command=self.imitate ) self.physics_client = None self.body_id = None threading.Thread(target=self.run_pybullet, daemon=True).start() self.socket.connect((self.host, self.port)) self.cycle_base() self.tkinter_root.mainloop() def __exit__(self, exc_type, exc_value, traceback): print("exiting") self.socket.close() def run_pybullet(self): self.physics_client = p.connect(p.GUI) p.setGravity(0, 0, -9.81, physicsClientId=self.physics_client) p.setAdditionalSearchPath(pybullet_data.getDataPath()) self.body_id = p.loadURDF( f"urdf/{self.urdf_filename}.urdf", [0, 0, 0], useFixedBase=True, globalScaling=1, ) while True: p.stepSimulation() time.sleep(1 / 24) def close(self): print("close") # self.tkinter_info_label.destroy() self.tkinter_root.quit() self.socket.close() self.socket = None # sys.exit() def cycle_base(self): self.tkinter_root.geometry("700x400") self.tkinter_imitate.pack(pady=20) self.tkinter_start_cycle.pack(pady=20) self.tkinter_exit.pack(pady=20) self.get_axis() self.set_text(text=f"Координаты осей {self.start_axis_coordinates}") time.sleep(0.5) self.set_joint([math.radians(c) for c in self.start_axis_coordinates]) self.get_world() self.set_text(text=f"Мировые координаты {self.start_world_coordinates}") time.sleep(0.5) # self.set_joint(self.convert_to_joint(self.start_world_coordinates[0:3])) self.get_command_count() self.set_text(text=f"Команд в очереди {self.remote_command_count}") time.sleep(0.5) self.send_data(self.set_global_speed()) self.set_text(text=f"Установили глобальную скорость {self.global_speed}") time.sleep(0.5) def cycle_start(self): self.send_data(self.start_cycle()) self.set_text(text=f"Старт одиночного цикла") time.sleep(0.5) self.add_rcc_list = ( [self.set_physical_speed(True), self.set_output_laser(True)] + self.steps_from_file() + [self.set_physical_speed(False), self.set_output_laser(False)] ) self.tkinter_info_label.config(text=f"Отправка данных...") self.tkinter_info_label.pack() self.tkinter_root.update() step = 2 for i in range(0, len(self.add_rcc_list), step): if not self.tkinter_info_label: return if not self.socket: return print(f"Отправка данных {i}...{i+step-1}") # self.update_text(m, text=f"Отправка данных {i}...{i+step-1}") self.tkinter_info_label.config(text=f"Отправка данных {i}...{i+step-1}") self.tkinter_root.update() self.send_data( make_addrcc_data(self.add_rcc_list[i : i + step], int(not i)) ) time.sleep(0.5) def imitate(self): points = self.steps_from_file() for p in points: if p["action"] == "10": angles = self.convert_to_joint( [float(p) for p in [p["m0"], p["m1"], p["m2"]]] ) self.set_joint(angles) time.sleep(0.5) def convert_to_joint(self, coordinates): num_joints = p.getNumJoints(self.body_id) joint_info = [p.getJointInfo(self.body_id, i) for i in range(num_joints)] joint_angles = p.calculateInverseKinematics( self.body_id, endEffectorLinkIndex=num_joints - 1, targetPosition=coordinates, ) print(joint_angles) return joint_angles def set_joint(self, coordinates): num_joints = p.getNumJoints(self.body_id) for joint_index in range(0, num_joints): # if not joint_index in coordinates: # return p.setJointMotorControl2( bodyUniqueId=self.body_id, jointIndex=joint_index, controlMode=p.POSITION_CONTROL, targetPosition=coordinates[joint_index], ) self.tkinter_info_label.config( text=f"Ось {joint_index} повернуть на {coordinates[joint_index]} градусов" ) self.tkinter_info_label.pack() self.tkinter_root.update() time.sleep(0.01) def set_text(self, text): label = tkinter.Label(self.tkinter_root, text=text) label.pack() self.tkinter_root.update() def send_data(self, data): if not self.socket: return self.socket.send(str.encode(json.dumps(data))) response_data = self.socket.recv(1024) response = json.loads(response_data) if data["reqType"] == "query": return response["queryData"] elif data["reqType"] == "command": return response["cmdReply"] elif data["reqType"] == "AddRCC" and "cmdReply" in response.keys(): return response["cmdReply"] else: pprint(response) def get_axis(self): axis_coord_raw = self.send_data( make_query_data( ["axis-0", "axis-1", "axis-2", "axis-3", "axis-4", "axis-5"] ) ) self.start_axis_coordinates = [float(i) for i in axis_coord_raw] print("start_axis_coordinates", self.start_axis_coordinates) def get_world(self): world_coord_raw = self.send_data( make_query_data( ["world-0", "world-1", "world-2", "world-3", "world-4", "world-5"] ) ) self.start_world_coordinates = [float(i) for i in world_coord_raw] print("start_world_coordinates", self.start_world_coordinates) def get_command_count(self): res = self.send_data(make_query_data(["RemoteCmdLen"])) self.remote_command_count = res print(res) def set_global_speed(self): # Изменили глобальную скорость на global_speed% return make_command_data(["modifyGSPD", str(self.global_speed * 10)]) def start_cycle(self): return make_command_data(["actionSingleCycle"]) def round(v): return round(v, 3) def make_world_step(self, type, point): step = { "oneshot": "0", "delay": "0.0", "speed": str(self.line_speed), "smooth": str(self.line_smooth), "coord": "0", "tool": str(self.line_tool), "ckStatus": "0x3F", } if type == "line": pairs = zip(self.start_world_coordinates, point) m0, m1, m2, m3, m4, m5 = [round(sum(i), 3) for i in pairs] step.update({"action": "10"}) step.update({"m0": m0, "m1": m1, "m2": m2, "m3": m3, "m4": m4, "m5": m5}) step.update({"m6": 0, "m7": 0}) elif type == "curve": pairs = zip(self.start_world_coordinates, point[:5]) m0, m1, m2, m3, m4, m5 = [round(sum(i), 3) for i in pairs] pairs_p = zip(self.start_world_coordinates, point[6:]) m0_p, m1_p, m2_p, m3_p, m4_p, m5_p = [round(sum(i), 3) for i in pairs_p] step.update({"action": "17"}) step.update({"m0": m0, "m1": m1, "m2": m2, "m3": m3, "m4": m4, "m5": m5}) step.update({"m6": 0, "m7": 0}) step.update( { "m0_p": m0_p, "m1_p": m1_p, "m2_p": m2_p, "m3_p": m3_p, "m4_p": m4_p, "m5_p": m5_p, } ) step.update({"m6_p": 0, "m7_p": 0}) for s in step: step[s] = str(step[s]) return step def set_physical_speed(self, status: bool = False): return ( { "oneshot": "0", "action": "51", "isUse": str(int(status)), "speed": str(self.physical_speed * 1000), }, ) def set_output_laser(self, status: bool = False): return ( { "oneshot": "0", "action": "200", "type": "0", "io_status": str(int(status)), "point": str(self.laser_id), }, ) def steps_from_file(self): result = [] with open(f"data/{self.filename}.nc.result", "r") as fp: for line in fp: data = line.strip().split(" ") prep = {} for item in data: prep[item[:1]] = float(item[1:]) result.append( self.make_world_step( "line", ( prep.get("X", 0), prep.get("Y", 0), prep.get("Z", 0), prep.get("U", 0), prep.get("V", 0), prep.get("W", 0), ), ) ) return result SocketRobotArm()