modbus_test/client_socket.py

311 lines
9.9 KiB
Python

import socket
import json
import os
import sys
import math
from pprint import pprint
import tkinter
import threading
import pybullet as p
import pybullet_data
os.environ["LIBGL_ALWAYS_SOFTWARE"] = "1"
from func import *
class SocketRobotArm:
line_speed = 100.0
line_smooth = 9
line_tool = 1
global_speed = 100
physical_speed = 50
# laser_id = 15
laser_id = 14
# filename = 'half-sphere-no-angle'
filename = "half-sphere"
urdf_filename = "sample"
pass_size = 4
def __init__(self, *args, **kwargs):
self.socket = socket.socket()
self.host = MODBUS_SERVER_HOST
self.port = 9760
if len(sys.argv) and "--test" in sys.argv:
self.host = "127.0.0.1"
self.port = 65432
self.tkinter_root = tkinter.Tk()
self.tkinter_info_label = tkinter.Label(self.tkinter_root)
self.tkinter_exit = tkinter.Button(
self.tkinter_root, text="Exit", command=self.close
)
self.tkinter_start_cycle = tkinter.Button(
self.tkinter_root, text="Start cycle", command=self.cycle_start
)
self.physics_client = None
self.body_id = None
threading.Thread(target=self.run_pybullet, daemon=True).start()
self.socket.connect((self.host, self.port))
self.cycle_base()
self.tkinter_root.mainloop()
def __exit__(self, exc_type, exc_value, traceback):
print("exiting")
self.socket.close()
def run_pybullet(self):
self.physics_client = p.connect(p.GUI)
p.setGravity(0, 0, -9.81, physicsClientId=self.physics_client)
p.setAdditionalSearchPath(pybullet_data.getDataPath())
self.body_id = p.loadURDF(
f"urdf/{self.urdf_filename}.urdf",
[0, 0, 0],
useFixedBase=True,
globalScaling=2,
)
while True:
p.stepSimulation()
time.sleep(1 / 24)
def close(self):
print("close")
# self.tkinter_info_label.destroy()
self.tkinter_root.quit()
self.socket.close()
self.socket = None
# sys.exit()
def cycle_base(self):
self.tkinter_root.geometry("500x300")
self.tkinter_start_cycle.pack(pady=20)
self.tkinter_exit.pack(pady=20)
self.get_axis()
self.set_text(text=f"Координаты осей {self.start_axis_coordinates}")
time.sleep(0.5)
self.set_joint([math.radians(c) for c in self.start_axis_coordinates])
self.get_world()
self.set_text(text=f"Мировые координаты {self.start_world_coordinates}")
time.sleep(0.5)
self.set_joint(self.convert_to_joint(self.start_world_coordinates[0:3]))
self.get_command_count()
self.set_text(text=f"Команд в очереди {self.remote_command_count}")
time.sleep(0.5)
self.send_data(self.set_global_speed())
self.set_text(text=f"Установили глобальную скорость {self.global_speed}")
time.sleep(0.5)
def cycle_start(self):
self.send_data(self.start_cycle())
self.set_text(text=f"Старт одиночного цикла")
time.sleep(0.5)
self.add_rcc_list = (
[self.set_physical_speed(True), self.set_output_laser(True)]
+ self.steps_from_file()
+ [self.set_physical_speed(False), self.set_output_laser(False)]
)
self.tkinter_info_label.config(text=f"Отправка данных...")
self.tkinter_info_label.pack()
self.tkinter_root.update()
step = 2
for i in range(0, len(self.add_rcc_list), step):
if not self.tkinter_info_label:
return
if not self.socket:
return
print(f"Отправка данных {i}...{i+step-1}")
# self.update_text(m, text=f"Отправка данных {i}...{i+step-1}")
self.tkinter_info_label.config(text=f"Отправка данных {i}...{i+step-1}")
self.tkinter_root.update()
self.send_data(
make_addrcc_data(self.add_rcc_list[i : i + step], int(not i))
)
time.sleep(0.5)
def convert_to_joint(self, coordinates):
num_joints = p.getNumJoints(self.body_id)
joint_info = [p.getJointInfo(self.body_id, i) for i in range(num_joints)]
joint_angles = p.calculateInverseKinematics(
self.body_id,
endEffectorLinkIndex=num_joints - 1,
targetPosition=coordinates,
)
print(joint_angles)
return joint_angles
def set_joint(self, coordinates):
num_joints = p.getNumJoints(self.body_id)
for joint_index in range(0, num_joints):
p.setJointMotorControl2(
bodyUniqueId=self.body_id,
jointIndex=joint_index,
controlMode=p.POSITION_CONTROL,
targetPosition=coordinates[joint_index],
)
self.tkinter_info_label.config(
text=f"Ось {joint_index} повернуть на {coordinates[joint_index]} градусов"
)
self.tkinter_info_label.pack()
self.tkinter_root.update()
time.sleep(0.5)
def set_text(self, text):
label = tkinter.Label(self.tkinter_root, text=text)
label.pack()
self.tkinter_root.update()
def send_data(self, data):
if not self.socket:
return
self.socket.send(str.encode(json.dumps(data)))
response_data = self.socket.recv(1024)
response = json.loads(response_data)
if data["reqType"] == "query":
return response["queryData"]
elif data["reqType"] == "command":
return response["cmdReply"]
elif data["reqType"] == "AddRCC" and "cmdReply" in response.keys():
return response["cmdReply"]
else:
pprint(response)
def get_axis(self):
axis_coord_raw = self.send_data(
make_query_data(
["axis-0", "axis-1", "axis-2", "axis-3", "axis-4", "axis-5"]
)
)
self.start_axis_coordinates = [float(i) for i in axis_coord_raw]
print("start_axis_coordinates", self.start_axis_coordinates)
def get_world(self):
world_coord_raw = self.send_data(
make_query_data(
["world-0", "world-1", "world-2", "world-3", "world-4", "world-5"]
)
)
self.start_world_coordinates = [float(i) for i in world_coord_raw]
print("start_world_coordinates", self.start_world_coordinates)
def get_command_count(self):
res = self.send_data(make_query_data(["RemoteCmdLen"]))
self.remote_command_count = res
print(res)
def set_global_speed(self):
# Изменили глобальную скорость на global_speed%
return make_command_data(["modifyGSPD", str(self.global_speed * 10)])
def start_cycle(self):
return make_command_data(["actionSingleCycle"])
def round(v):
return round(v, 3)
def make_world_step(self, type, point):
step = {
"oneshot": "0",
"delay": "0.0",
"speed": str(self.line_speed),
"smooth": str(self.line_smooth),
"coord": "0",
"tool": str(self.line_tool),
"ckStatus": "0x3F",
}
if type == "line":
pairs = zip(self.start_world_coordinates, point)
m0, m1, m2, m3, m4, m5 = [round(sum(i), 3) for i in pairs]
step.update({"action": "10"})
step.update({"m0": m0, "m1": m1, "m2": m2, "m3": m3, "m4": m4, "m5": m5})
step.update({"m6": 0, "m7": 0})
elif type == "curve":
pairs = zip(self.start_world_coordinates, point[:5])
m0, m1, m2, m3, m4, m5 = [round(sum(i), 3) for i in pairs]
pairs_p = zip(self.start_world_coordinates, point[6:])
m0_p, m1_p, m2_p, m3_p, m4_p, m5_p = [round(sum(i), 3) for i in pairs_p]
step.update({"action": "17"})
step.update({"m0": m0, "m1": m1, "m2": m2, "m3": m3, "m4": m4, "m5": m5})
step.update({"m6": 0, "m7": 0})
step.update(
{
"m0_p": m0_p,
"m1_p": m1_p,
"m2_p": m2_p,
"m3_p": m3_p,
"m4_p": m4_p,
"m5_p": m5_p,
}
)
step.update({"m6_p": 0, "m7_p": 0})
for s in step:
step[s] = str(step[s])
return step
def set_physical_speed(self, status: bool = False):
return (
{
"oneshot": "0",
"action": "51",
"isUse": str(int(status)),
"speed": str(self.physical_speed * 1000),
},
)
def set_output_laser(self, status: bool = False):
return (
{
"oneshot": "0",
"action": "200",
"type": "0",
"io_status": str(int(status)),
"point": str(self.laser_id),
},
)
def steps_from_file(self):
result = []
with open(f"data/{self.filename}.nc.result", "r") as fp:
for line in fp:
data = line.strip().split(" ")
prep = {}
for item in data:
prep[item[:1]] = float(item[1:])
result.append(
self.make_world_step(
"line",
(
prep.get("X", 0),
prep.get("Y", 0),
prep.get("Z", 0),
prep.get("U", 0),
prep.get("V", 0),
prep.get("W", 0),
),
)
)
return result
SocketRobotArm()