modbus_test/robot/client_socket.py

427 lines
14 KiB
Python

import socket
import json
import os
import math
from typing import Literal
from pprint import pprint
import threading
import pybullet as p
import pybullet_data
from robot.func import *
from logger import logger
# os.environ["LIBGL_ALWAYS_SOFTWARE"] = "1"
class SocketRobotArm:
line_speed = 100.0
line_smooth = 9
line_tool = 1
global_speed = 10
physical_speed = 10
# laser_id = 15
laser_id = 14
filename = "test"
urdf_filename = "sample"
pass_size = 4
Status = Literal["connected", "not_connected", "error"]
start_axis_coordinates = []
start_world_coordinates = []
remote_command_count = []
def __init__(self, *args, **kwargs):
self.socket = None
self.host = None
self.port = 9760
self.slave_id = None
self.status: SocketRobotArm.Status = "not_connected"
self.physics_client = None
self.body_id = None
self.q_app = None
self.imitate_point = None
def __exit__(self, exc_type, exc_value, traceback):
print("exiting")
self.socket.close()
def start(self, type="DIRECT"):
logger.info(type)
self.run_pybullet(type)
def start_loop(self, urdf):
self.urdf_filename = urdf
p.resetSimulation()
self.load_models()
def run_pybullet(self, type="DIRECT"):
self.physics_client = p.connect(getattr(p, type))
logger.info(f"Connect to {self.physics_client} by {type}")
p.setGravity(0, 0, -9.81, physicsClientId=self.physics_client)
p.setAdditionalSearchPath(pybullet_data.getDataPath())
p.configureDebugVisualizer(p.COV_ENABLE_RENDERING, 0)
self.simulation_loop()
def load_models(self):
p.loadURDF("plane.urdf", physicsClientId=self.physics_client)
urdf_path = os.path.join("urdf", f"{self.urdf_filename}.urdf")
self.body_id = p.loadURDF(
urdf_path, [0, 0, 0], useFixedBase=1, physicsClientId=self.physics_client
)
time.sleep(1)
# Получение AABB
aabb = p.getAABB(self.body_id)
# Вычисление размеров модели
size_x = aabb[1][0] - aabb[0][0] # Длина по оси X
size_y = aabb[1][1] - aabb[0][1] # Длина по оси Y
size_z = aabb[1][2] - aabb[0][2] # Длина по оси Z
print(
f"Размеры модели (длина x, длина y, длина z): ({size_x}, {size_y}, {size_z})"
)
def simulation_loop(self):
logger.info(f"Начат цикл симуляции")
while True:
# bodyUniqueId = self.body_id
# position, orientation = p.getBasePositionAndOrientation(bodyUniqueId)
# # logger.info(f"Position: {position}, Orientation: {orientation}")
# # Получение состояния суставов
# num_joints = p.getNumJoints(bodyUniqueId)
# joint_states = p.getJointStates(bodyUniqueId, range(num_joints))
# joint_log = []
# for i, state in enumerate(joint_states):
# joint_position = state[0] # Угол
# joint_velocity = state[1] # Угловая скорость
# joint_log.append(
# f"Joint {i} - Position: {joint_position}, Velocity: {joint_velocity}"
# )
# logger.info("\n".join(joint_log))
p.stepSimulation()
time.sleep(1 / 240)
def get_pybullet_image(self):
if self.physics_client is None:
return
width, height, rgb, _, _ = p.getCameraImage(
width=640,
height=640,
viewMatrix=p.computeViewMatrix(
cameraEyePosition=[2, 2, 2],
cameraTargetPosition=[0, 0, 0],
cameraUpVector=[0, 0, 1],
),
projectionMatrix=p.computeProjectionMatrixFOV(
fov=60.0, aspect=1.0, nearVal=0.1, farVal=10.0
),
renderer=p.ER_TINY_RENDERER,
physicsClientId=self.physics_client,
)
return (rgb, width, height)
def close(self):
self.socket.close()
self.socket = None
self.status = "not_connected"
self.slave_id = None
def connect(self, host, slave_id):
self.host = host
self.slave_id = slave_id
if self.socket is None:
self.socket = socket.socket()
self.socket.connect((self.host, self.port))
self.status = "connected"
def get_status(self):
return self.status
def upd_model(self):
logger.info("UPDATE")
self.get_axis()
self.set_text(text=f"Координаты осей {self.start_axis_coordinates}")
time.sleep(0.5)
self.set_joint([math.radians(c) for c in self.start_axis_coordinates])
self.get_world()
self.set_text(text=f"Мировые координаты {self.start_world_coordinates}")
time.sleep(0.5)
self.set_joint(self.convert_to_joint(self.start_world_coordinates[0:3]))
self._get_command_count()
self.set_text(text=f"Команд в очереди {self.remote_command_count}")
time.sleep(0.5)
def cycle_base(self):
self.upd_model()
self.send_data(self.set_global_speed())
self.set_text(text=f"Установили глобальную скорость {self.global_speed}")
time.sleep(0.5)
def cycle_start(self):
self.send_data(self.start_cycle())
self.set_text(text=f"Старт одиночного цикла")
time.sleep(0.5)
self.add_rcc_list = (
[self.set_physical_speed(True), self.set_output_laser(True)]
+ self.steps_from_file()
# + self.convert_file_to_join()
+ [self.set_physical_speed(False), self.set_output_laser(False)]
)
step = 4
empty = 1
for i in range(0, len(self.add_rcc_list), step):
if not self.socket:
return
print(f"Отправка данных {i}...{i+step-1}")
self.send_data(make_addrcc_data(self.add_rcc_list[i : i + step], empty))
empty = 0
time.sleep(0.05)
def imitate(self):
points = self.steps_from_file()
for i, p in enumerate(points):
if p["action"] == "10":
self.imitate_point = i
angles = self.convert_to_joint(
[float(p) for p in [p["m0"], p["m1"], p["m2"]]]
)
self.set_joint(angles)
time.sleep(0.5)
def convert_to_joint(self, coordinates):
num_joints = p.getNumJoints(self.body_id)
joint_info = [p.getJointInfo(self.body_id, i) for i in range(num_joints)]
joint_angles = p.calculateInverseKinematics(
self.body_id,
endEffectorLinkIndex=num_joints - 1,
targetPosition=coordinates,
)
print(joint_angles)
logger.info(joint_angles)
def set_text(self, text):
logger.info(text)
def set_joint(self, coordinates):
logger.info("set joints")
num_joints = p.getNumJoints(self.body_id)
if coordinates is None:
return
for joint_index in range(0, num_joints):
if len(coordinates) <= joint_index or coordinates[joint_index] is None:
return
p.setJointMotorControl2(
bodyUniqueId=self.body_id,
jointIndex=joint_index,
controlMode=p.POSITION_CONTROL,
targetPosition=coordinates[joint_index],
)
time.sleep(0.01)
def send_data(self, data):
if not self.socket:
return
self.socket.send(str.encode(json.dumps(data)))
response_data = self.socket.recv(1024)
response = json.loads(response_data)
if data["reqType"] == "query":
return response["queryData"]
elif data["reqType"] == "command":
return response["cmdReply"]
elif data["reqType"] == "AddRCC" and "cmdReply" in response.keys():
return response["cmdReply"]
else:
pprint(response)
def get_axis(self):
axis_coord_raw = self.send_data(
make_query_data(
["axis-0", "axis-1", "axis-2", "axis-3", "axis-4", "axis-5"]
)
)
self.start_axis_coordinates = [float(i) for i in axis_coord_raw]
print("start_axis_coordinates", self.start_axis_coordinates)
def get_axis_coordinates(self):
return self.start_axis_coordinates
def get_world(self):
world_coord_raw = self.send_data(
make_query_data(
["world-0", "world-1", "world-2", "world-3", "world-4", "world-5"]
)
)
self.start_world_coordinates = [float(i) for i in world_coord_raw]
print("start_world_coordinates", self.start_world_coordinates)
def get_world_coordinates(self):
return self.start_world_coordinates
def _get_command_count(self):
res = self.send_data(make_query_data(["RemoteCmdLen"]))
self.remote_command_count = res
print(res)
def get_command_count(self):
return self.remote_command_count
def get_imitate_point(self):
return self.imitate_point
def set_global_speed(self):
# Изменили глобальную скорость на global_speed%
return make_command_data(["modifyGSPD", str(self.global_speed * 10)])
def start_cycle(self):
return make_command_data(["actionSingleCycle"])
def round(v):
return round(v, 3)
def make_world_step(self, type, point):
step = {
"oneshot": "1",
"delay": "0.0",
"speed": str(self.line_speed),
"smooth": str(self.line_smooth),
"coord": "0",
"tool": str(self.line_tool),
"ckStatus": "0x3F",
}
if type == "line" or type == "free":
if type == "line":
pairs = zip(self.start_world_coordinates, point)
if type == "free":
pairs = zip(self.start_axis_coordinates, point)
m0, m1, m2, m3, m4, m5 = [round(sum(i), 3) for i in pairs]
if type == "line":
step.update({"action": "10"})
if type == "free":
step.update({"action": "4"})
step.update({"m0": m0, "m1": m1, "m2": m2, "m3": m3, "m4": m4, "m5": m5})
step.update({"m6": 0, "m7": 0})
elif type == "curve":
pairs = zip(self.start_world_coordinates, point[:5])
m0, m1, m2, m3, m4, m5 = [round(sum(i), 3) for i in pairs]
pairs_p = zip(self.start_world_coordinates, point[6:])
m0_p, m1_p, m2_p, m3_p, m4_p, m5_p = [round(sum(i), 3) for i in pairs_p]
step.update({"action": "17"})
step.update({"m0": m0, "m1": m1, "m2": m2, "m3": m3, "m4": m4, "m5": m5})
step.update({"m6": 0, "m7": 0})
step.update(
{
"m0_p": m0_p,
"m1_p": m1_p,
"m2_p": m2_p,
"m3_p": m3_p,
"m4_p": m4_p,
"m5_p": m5_p,
}
)
step.update({"m6_p": 0, "m7_p": 0})
for s in step:
step[s] = str(step[s])
return step
def set_physical_speed(self, status: bool = False):
return (
{
"oneshot": "0",
"action": "51",
"isUse": str(int(status)),
"speed": str(self.physical_speed * 1000),
},
)
def set_output_laser(self, status: bool = False):
return (
{
"oneshot": "0",
"action": "200",
"type": "0",
"io_status": str(int(status)),
"point": str(self.laser_id),
},
)
def convert_file_to_join(self):
result = []
with open(f"data/{self.filename}.nc.result", "r") as fp:
for line in fp:
data = line.strip().split(" ")
prep = {}
for item in data:
prep[item[:1]] = float(item[1:])
pj = list(
self.convert_to_joint(
(
prep.get("X", 0),
prep.get("Y", 0),
prep.get("Z", 0),
)
)
)
pj.extend(
[
prep.get("U", 0),
prep.get("V", 0),
prep.get("W", 0),
]
),
print(pj)
result.append(self.make_world_step("free", tuple(pj)))
return result
def steps_from_file(self):
result = []
with open(f"data/{self.filename}.nc.result", "r") as fp:
for line in fp:
data = line.strip().split(" ")
prep = {}
for item in data:
prep[item[:1]] = float(item[1:])
result.append(
self.make_world_step(
"line",
(
prep.get("X", 0),
prep.get("Y", 0),
prep.get("Z", 0),
prep.get("U", 0),
prep.get("V", 0),
prep.get("W", 0),
),
)
)
return result