modbus_test/robot/client_socket.py

373 lines
11 KiB
Python

import socket
import json
import os
import math
import numpy as np
from typing import Literal
from pprint import pprint
import threading
import pybullet as p
import pybullet_industrial as pi
from logger import logger
from robot.func import *
from robot.prepare_data import PrepareRobotData
from robot.urdf_manager import UrdfManager
from robot.socket_manager import SocketManager
# os.environ["LIBGL_ALWAYS_SOFTWARE"] = "1"
class SocketRobotArm:
global_speed = 10
physical_speed = 10
# laser_id = 15
laser_id = 14
filename = "pyr"
urdf_filename = "sample"
pass_size = 4
connected_status = Literal["connected", "not_connected", "error"]
_axis_coordinates = []
_world_coordinates = []
_command_count = []
command_type = "base"
command_data = None
def __init__(self, application_path, *args, **kwargs):
self.socket = None
self.host = None
self.port = 9760
self.slave_id = None
self.status: SocketRobotArm.connected_status = "not_connected"
self.q_app = None
self.imitate_point = None
robot_start_position = [0, -0, 0]
self.prepare_data = PrepareRobotData()
self.socket_manager = SocketManager()
self.urdf_manager = UrdfManager(
robot_start_position=robot_start_position, application_path=application_path
)
def __exit__(self, exc_type, exc_value, traceback):
logger.info("exiting")
self.socket.close()
def start(self, type="DIRECT"):
self.urdf_manager.run_pybullet(type)
def start_loop(self, urdf):
self.urdf_manager.start_loop(urdf)
def load_models(self):
self.urdf_manager.load_models()
self.body_id = self.urdf_manager.body_id
def get_pybullet_image(self):
return self.urdf_manager.get_pybullet_image()
def close(self):
self.socket_manager.close()
def connect(self, host, slave_id):
self.socket_manager.connect(host)
with open("delta.json", "w") as f:
pass
def get_status(self):
return self.socket_manager.status
def upd_model(self):
threading.Thread(target=self.upd_model_func, daemon=True).start()
def upd_model_func(self):
# logger.info("UPDATE MODEL FUNC")\
self.get_coordinates("axis")
self.set_text(text=f"Координаты осей {self.axis_coordinates}")
time.sleep(0.5)
self.get_coordinates("world")
self.set_text(text=f"Мировые координаты {self.world_coordinates}")
time.sleep(0.5)
self._fetch_command_count()
time.sleep(0.5)
def motionFund(i, axis):
motion_type = "reset"
try:
v = axis[i]
if v is None:
v = 0
except IndexError:
v = 0
v = np.radians(v)
if motion_type == "reset":
p.resetJointState(self.urdf_manager.body_id, i, v)
else:
p.setJointMotorControl2(
self.urdf_manager.body_id, i, p.POSITION_CONTROL, v
)
num_joints = p.getNumJoints(self.urdf_manager.body_id)
axisJointPoses = self.axis_coordinates
for i in range(num_joints - 1):
motionFund(i, axisJointPoses)
time.sleep(0.5)
coordinates = self.world_coordinates[:3]
angles = [
self.world_coordinates[1],
self.world_coordinates[2],
self.world_coordinates[3],
]
worldJointPoses = self.convert_to_joint_base(
[float(p) * 0.001 for p in coordinates], [math.radians(p) for p in angles]
)
for i in range(num_joints):
motionFund(i, np.degrees(list(worldJointPoses)))
time.sleep(2)
calcWorldPosition = p.getLinkState(
self.urdf_manager.body_id, num_joints - 1, computeForwardKinematics=True
)
calcWorldPosition = list(
[p * 1000 for p in calcWorldPosition[0]]
+ [np.degrees(p) for p in p.getEulerFromQuaternion(calcWorldPosition[1])]
)
time.sleep(0.5)
position_diff = np.linalg.norm(
self.world_coordinates[:3] - np.array(calcWorldPosition[:3])
)
print("Position Difference:", position_diff)
self.set_text(text=f"Команд в очереди {self.command_count}")
time.sleep(0.5)
def cycle_base(self):
self.upd_model()
self.send_data(self.set_global_speed())
self.set_text(text=f"Установили глобальную скорость {self.global_speed}")
time.sleep(0.5)
def cycle_start(self):
self.send_data(self.start_cycle())
self.set_text(text=f"Старт одиночного цикла")
time.sleep(0.5)
commands = (
self.steps_from_file()
if self.command_type == "base"
else self.convert_point_to_free()
)
with open(f"log/{self.command_type}.json", "w") as myfile:
myfile.write(json.dumps(commands))
self.add_rcc_list = (
[self.set_physical_speed(True), self.set_output_laser(True)]
+ commands
+ [self.set_physical_speed(False), self.set_output_laser(False)]
)
step = 4
empty = 1
for i in range(0, len(self.add_rcc_list), step):
self.command_data = f"Отправка данных {i}...{i+step-1}"
self.send_data(make_addrcc_data(self.add_rcc_list[i : i + step], empty))
empty = 0
time.sleep(0.05)
def imitate(self):
threading.Thread(target=self.imitate_func, daemon=True).start()
def set_text(self, text):
logger.info(text)
def send_data(self, data):
return self.socket_manager.send_data(data)
def get_coordinates(self, coord_type: Literal["axis", "world"] = "axis"):
request = make_query_data([f"{coord_type}-{i}" for i in range(6)])
result = self.send_data(request)
if not result:
return
data = [float(i) for i in result]
if coord_type == "axis":
self._axis_coordinates = data
elif coord_type == "world":
self._world_coordinates = data
@property
def axis_coordinates(self):
return self._axis_coordinates
@property
def world_coordinates(self):
return self._world_coordinates
def _fetch_command_count(self):
request = make_query_data(["RemoteCmdLen"])
result = self.send_data(request)
self._command_count = result
@property
def command_count(self):
return self._command_count
def set_command_type(self, data):
self.command_type = data
def get_command_type(self):
return self.command_type
def get_command_data(self):
return self.command_data
def get_imitate_point(self):
return self.imitate_point
def set_global_speed(self):
return make_command_data(["modifyGSPD", str(self.global_speed * 10)])
def start_cycle(self):
return make_command_data(["actionSingleCycle"])
def set_physical_speed(self, status: bool = False):
return (
{
"oneshot": "0",
"action": "51",
"isUse": str(int(status)),
"speed": str(self.physical_speed * 1000),
},
)
def set_output_laser(self, status: bool = False):
return (
{
"oneshot": "0",
"action": "200",
"type": "0",
"io_status": str(int(status)),
"point": str(self.laser_id),
},
)
def imitate_func(self):
points = self.convert_point_to_free()
for point in points:
jointPoses = [np.radians(float(point[f"m{i}"])) for i in range(6)]
num_joints = p.getNumJoints(self.urdf_manager.body_id)
for i in range(num_joints):
try:
val = jointPoses[i]
except Exception as e:
val = 0
# p.setJointMotorControl2(
# self.urdf_manager.body_id, i, p.POSITION_CONTROL, val
# )
p.resetJointState(self.urdf_manager.body_id, i, val)
time.sleep(0.3)
# p.setJointMotorControlArray(self.urdf_manager.body_id, p.POSITION_CONTROL, jointPoses)
def convert_point_to_free(self):
points = self.steps_from_file()
res = []
for i, point in enumerate(points):
if point["action"] == "10":
self.imitate_point = i
points = [
point["m0"],
point["m1"],
point["m2"],
]
angles = [
point["m3"],
point["m4"],
point["m5"],
]
jointPoses = self.convert_to_joint_base(
[float(p) * 0.001 for p in points], angles
)
res.append(
self.prepare_data.make_step(
"free", np.degrees(jointPoses), [0, 0, 0, 0, 0, 0]
)
)
return res
def convert_to_joint_base(self, coordinates, angles):
body_id = 0
num_joints = p.getNumJoints(body_id)
lower_limits = []
upper_limits = []
for i in range(num_joints):
limit = p.getJointInfo(body_id, i)
lower_limits.append(limit[8])
upper_limits.append(limit[9])
joint_angles = p.calculateInverseKinematics(
body_id,
endEffectorLinkIndex=num_joints - 1,
targetPosition=[float(c) for c in coordinates],
targetOrientation=angles,
# lowerLimits=lower_limits,
# upperLimits=upper_limits,
# jointRanges=[4, 4, 4, 4, 4, 4],
# restPoses=[
# math.radians(0.1873),
# math.radians(59.059),
# math.radians(-99.4105),
# math.radians(-0.2214),
# math.radians(41.7877),
# math.radians(-0.0011),
# ],
)
return joint_angles
def steps_from_file(self):
if not self.world_coordinates:
return []
result = []
with open(f"data/{self.filename}.nc.result", "r") as fp:
for line in fp:
data = line.strip().split(" ")
prep = {}
axis = ["X", "Y", "Z", "U", "V", "W"]
for item in data:
if item and item[0] and item[0] in axis:
prep[item[:1]] = float(item[1:])
if len(prep) > 0:
result.append(
self.prepare_data.make_step(
"line",
(
prep.get("X", 0),
prep.get("Y", 0),
prep.get("Z", 0),
prep.get("U", 0),
prep.get("V", 0),
prep.get("W", 0),
),
self.world_coordinates,
)
)
return result